blob: 8df4545900fe1e04c6c7eec72d992967bed4a629 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.integration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import javax.cache.Cache;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreSession;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.processors.cache.IgniteCacheAbstractTest;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.resources.CacheStoreSessionResource;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
/**
*
*/
public abstract class IgniteCacheStoreSessionAbstractTest extends IgniteCacheAbstractTest {
/** */
protected static volatile List<ExpectedData> expData;
/** */
protected static final String CACHE_NAME1 = "cache1";
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
TestStore store = new TestStore(); // Use the same store instance for both caches.
assert cfg.getCacheConfiguration().length == 1;
CacheConfiguration ccfg0 = cfg.getCacheConfiguration()[0];
ccfg0.setReadThrough(true);
ccfg0.setWriteThrough(true);
ccfg0.setCacheStoreFactory(singletonFactory(store));
CacheConfiguration ccfg1 = cacheConfiguration(igniteInstanceName);
ccfg1.setReadThrough(true);
ccfg1.setWriteThrough(true);
ccfg1.setName(CACHE_NAME1);
ccfg1.setCacheStoreFactory(singletonFactory(store));
cfg.setCacheConfiguration(ccfg0, ccfg1);
return cfg;
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
super.afterTestsStopped();
expData = null;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
expData = Collections.synchronizedList(new ArrayList<ExpectedData>());
super.beforeTestsStarted();
}
/**
* @param cache Cache.
* @param cnt Keys count.
* @return Keys.
* @throws Exception If failed.
*/
protected List<Integer> testKeys(IgniteCache cache, int cnt) throws Exception {
return primaryKeys(cache, cnt, 0);
}
/**
* @throws Exception If failed.
*/
@Test
public void testStoreSession() throws Exception {
assertEquals(DEFAULT_CACHE_NAME, jcache(0).getName());
assertEquals(CACHE_NAME1, ignite(0).cache(CACHE_NAME1).getName());
testStoreSession(jcache(0));
testStoreSession(ignite(0).cache(CACHE_NAME1));
}
/**
* @param cache Cache.
* @throws Exception If failed.
*/
private void testStoreSession(IgniteCache<Object, Object> cache) throws Exception {
Set<Integer> keys = new HashSet<>(primaryKeys(cache, 3, 100_000));
Integer key = keys.iterator().next();
boolean tx = atomicityMode() == TRANSACTIONAL;
expData.add(new ExpectedData(false, "load", new TreeMap<>(), cache.getName()));
assertEquals(key, cache.get(key));
assertTrue(expData.isEmpty());
expData.add(new ExpectedData(false, "loadAll", new TreeMap<>(), cache.getName()));
assertEquals(3, cache.getAll(keys).size());
assertTrue(expData.isEmpty());
expectedData(tx, "write", cache.getName());
cache.put(key, key);
assertTrue(expData.isEmpty());
expectedData(tx, "write", cache.getName());
cache.invoke(key, new EntryProcessor<Object, Object, Object>() {
@Override public Object process(MutableEntry<Object, Object> e, Object... args) {
e.setValue("val1");
return null;
}
});
assertTrue(expData.isEmpty());
expectedData(tx, "delete", cache.getName());
cache.remove(key);
assertTrue(expData.isEmpty());
Map<Object, Object> vals = new TreeMap<>();
for (Object key0 : keys)
vals.put(key0, key0);
expectedData(tx, "writeAll", cache.getName());
cache.putAll(vals);
assertTrue(expData.isEmpty());
expectedData(tx, "deleteAll", cache.getName());
cache.removeAll(keys);
assertTrue(expData.isEmpty());
expectedData(false, "loadCache", cache.getName());
cache.localLoadCache(null);
assertTrue(expData.isEmpty());
}
/**
* @param tx {@code True} is transaction is expected.
* @param expMtd Expected method.
* @param expCacheName Expected cache name.
*/
private void expectedData(boolean tx, String expMtd, String expCacheName) {
expData.add(new ExpectedData(tx, expMtd, new TreeMap<>(), expCacheName));
if (tx)
expData.add(new ExpectedData(true, "sessionEnd", F.<Object, Object>asMap(0, expMtd), expCacheName));
}
/**
*
*/
static class ExpectedData {
/** */
private final boolean tx;
/** */
private final String expMtd;
/** */
private final Map<Object, Object> expProps;
/** */
private final String expCacheName;
/**
* @param tx {@code True} if transaction is enabled.
* @param expMtd Expected method.
* @param expProps Expected properties.
* @param expCacheName Expected cache name.
*/
ExpectedData(boolean tx, String expMtd, Map<Object, Object> expProps, @NotNull String expCacheName) {
this.tx = tx;
this.expMtd = expMtd;
this.expProps = expProps;
this.expCacheName = expCacheName;
}
}
/**
*
*/
private static class AbstractStore {
/** */
@CacheStoreSessionResource
protected CacheStoreSession sesInParent;
}
/**
*
*/
private class TestStore extends AbstractStore implements CacheStore<Object, Object> {
/** Auto-injected store session. */
@CacheStoreSessionResource
private CacheStoreSession ses;
/** */
@IgniteInstanceResource
protected Ignite ignite;
/** {@inheritDoc} */
@Override public void loadCache(IgniteBiInClosure<Object, Object> clo, @Nullable Object... args) {
log.info("Load cache [tx=" + session().transaction() + ']');
checkSession("loadCache");
}
/** {@inheritDoc} */
@Override public void sessionEnd(boolean commit) throws CacheWriterException {
if (session().isWithinTransaction()) {
log.info("Tx end [commit=" + commit + ", tx=" + session().transaction() + ']');
checkSession("sessionEnd");
}
}
/** {@inheritDoc} */
@Override public Object load(Object key) throws CacheLoaderException {
log.info("Load [key=" + key + ", tx=" + session().transaction() + ']');
checkSession("load");
return key;
}
/** {@inheritDoc} */
@Override public Map<Object, Object> loadAll(Iterable<?> keys) throws CacheLoaderException {
log.info("LoadAll [keys=" + keys + ", tx=" + session().transaction() + ']');
checkSession("loadAll");
Map<Object, Object> loaded = new TreeMap<>();
for (Object key : keys)
loaded.put(key, key);
return loaded;
}
/** {@inheritDoc} */
@Override public void write(Cache.Entry<?, ?> entry) throws CacheWriterException {
log.info("Write [write=" + entry + ", tx=" + session().transaction() + ']');
checkSession("write");
}
/** {@inheritDoc} */
@Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) throws CacheWriterException {
log.info("WriteAll: [writeAll=" + entries + ", tx=" + session().transaction() + ']');
checkSession("writeAll");
}
/** {@inheritDoc} */
@Override public void delete(Object key) throws CacheWriterException {
log.info("Delete [key=" + key + ", tx=" + session().transaction() + ']');
checkSession("delete");
}
/** {@inheritDoc} */
@Override public void deleteAll(Collection<?> keys) throws CacheWriterException {
log.info("DeleteAll [keys=" + keys + ", tx=" + session().transaction() + ']');
checkSession("deleteAll");
}
/**
* @return Store session.
*/
private CacheStoreSession session() {
return ses;
}
/**
* @param mtd Called stored method.
*/
private void checkSession(String mtd) {
assertNotNull(ignite);
assertFalse(expData.isEmpty());
ExpectedData exp = expData.remove(0);
assertEquals(exp.expMtd, mtd);
CacheStoreSession ses = session();
assertNotNull(ses);
assertSame(ses, sesInParent);
if (exp.tx)
assertNotNull(ses.transaction());
else
assertNull(ses.transaction());
Map<Object, Object> props = ses.properties();
assertNotNull(props);
assertEquals(exp.expProps, props);
props.put(props.size(), mtd);
assertEquals(exp.expCacheName, ses.cacheName());
}
}
}