blob: 69a2a884cdc2171124605b529e86a4c688effd4d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed;
import java.util.Collection;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreSession;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.resources.CacheStoreSessionResource;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
/**
*
*/
public class IgniteCrossCacheTxStoreSelfTest extends GridCommonAbstractTest {
/** */
private static Map<String, CacheStore> firstStores = new ConcurrentHashMap<>();
/** */
private static Map<String, CacheStore> secondStores = new ConcurrentHashMap<>();
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE);
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
CacheConfiguration cfg1 = cacheConfiguration("cacheA", new FirstStoreFactory());
CacheConfiguration cfg2 = cacheConfiguration("cacheB", new FirstStoreFactory());
CacheConfiguration cfg3 = cacheConfiguration("cacheC", new SecondStoreFactory());
CacheConfiguration cfg4 = cacheConfiguration("cacheD", null);
cfg.setCacheConfiguration(cfg4, cfg2, cfg3, cfg1);
return cfg;
}
/**
* @param cacheName Cache name.
* @param factory Factory to use.
* @return Cache configuration.
*/
private CacheConfiguration cacheConfiguration(String cacheName, Factory<CacheStore> factory) {
CacheConfiguration cfg = defaultCacheConfiguration();
cfg.setNearConfiguration(null);
cfg.setName(cacheName);
cfg.setBackups(1);
if (factory != null) {
cfg.setCacheStoreFactory(factory);
cfg.setWriteThrough(true);
}
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
MvccFeatureChecker.skipIfNotSupported(MvccFeatureChecker.Feature.CACHE_STORE);
super.beforeTestsStarted();
cleanPersistenceDir();
startGrids(4);
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
firstStores.clear();
secondStores.clear();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
grid(0).cache("cacheA").removeAll();
grid(0).cache("cacheB").removeAll();
grid(0).cache("cacheC").removeAll();
for (CacheStore store : firstStores.values())
((TestStore)store).clear();
for (CacheStore store : secondStores.values())
((TestStore)store).clear();
}
/**
* @throws Exception If failed.
*/
@Test
public void testSameStore() throws Exception {
IgniteEx grid = grid(0);
TestStore firstStore = (TestStore)firstStores.get(grid.name());
TestStore secondStore = (TestStore)secondStores.get(grid.name());
assertNotNull(firstStore);
assertNotNull(secondStore);
Collection<String> firstStoreEvts = firstStore.events();
Collection<String> secondStoreEvts = secondStore.events();
try (Transaction tx = grid.transactions().txStart()) {
IgniteCache<Object, Object> cacheA = grid.cache("cacheA");
IgniteCache<Object, Object> cacheB = grid.cache("cacheB");
cacheA.put("1", "1");
cacheA.put("2", "2");
cacheB.put("1", "1");
cacheB.put("2", "2");
cacheA.remove("3");
cacheA.remove("4");
cacheB.remove("3");
cacheB.remove("4");
cacheA.put("5", "5");
cacheA.remove("6");
cacheB.put("7", "7");
tx.commit();
}
assertEqualsCollections(F.asList(
"writeAll cacheA 2",
"writeAll cacheB 2",
"deleteAll cacheA 2",
"deleteAll cacheB 2",
"write cacheA",
"delete cacheA",
"write cacheB",
"sessionEnd true"
),
firstStoreEvts);
assertEquals(0, secondStoreEvts.size());
}
/**
* @throws Exception If failed.
*/
@Test
public void testDifferentStores() throws Exception {
IgniteEx grid = grid(0);
TestStore firstStore = (TestStore)firstStores.get(grid.name());
TestStore secondStore = (TestStore)secondStores.get(grid.name());
assertNotNull(firstStore);
assertNotNull(secondStore);
Collection<String> firstStoreEvts = firstStore.events();
Collection<String> secondStoreEvts = secondStore.events();
try (Transaction tx = grid.transactions().txStart()) {
IgniteCache<Object, Object> cacheA = grid.cache("cacheA");
IgniteCache<Object, Object> cacheC = grid.cache("cacheC");
cacheA.put("1", "1");
cacheA.put("2", "2");
cacheC.put("1", "1");
cacheC.put("2", "2");
cacheA.remove("3");
cacheA.remove("4");
cacheC.remove("3");
cacheC.remove("4");
cacheA.put("5", "5");
cacheA.remove("6");
cacheC.put("7", "7");
tx.commit();
}
assertEqualsCollections(F.asList(
"writeAll cacheA 2",
"deleteAll cacheA 2",
"write cacheA",
"delete cacheA",
"sessionEnd true"
),
firstStoreEvts);
assertEqualsCollections(F.asList(
"writeAll cacheC 2",
"deleteAll cacheC 2",
"write cacheC",
"sessionEnd true"
),
secondStoreEvts);
}
/**
* @throws Exception If failed.
*/
@Test
public void testNonPersistentCache() throws Exception {
IgniteEx grid = grid(0);
TestStore firstStore = (TestStore)firstStores.get(grid.name());
TestStore secondStore = (TestStore)secondStores.get(grid.name());
assertNotNull(firstStore);
assertNotNull(secondStore);
Collection<String> firstStoreEvts = firstStore.events();
Collection<String> secondStoreEvts = secondStore.events();
try (Transaction tx = grid.transactions().txStart()) {
IgniteCache<Object, Object> cacheA = grid.cache("cacheA");
IgniteCache<Object, Object> cacheD = grid.cache("cacheD");
cacheA.put("1", "1");
cacheA.put("2", "2");
cacheD.put("1", "1");
cacheD.put("2", "2");
cacheA.remove("3");
cacheA.remove("4");
cacheD.remove("3");
cacheD.remove("4");
cacheA.put("5", "5");
cacheA.remove("6");
cacheD.put("7", "7");
tx.commit();
}
assertEqualsCollections(F.asList(
"writeAll cacheA 2",
"deleteAll cacheA 2",
"write cacheA",
"delete cacheA",
"sessionEnd true"
),
firstStoreEvts);
assertEquals(0, secondStoreEvts.size());
}
/**
*
*/
private static class TestStore implements CacheStore<Object, Object> {
/** */
private Queue<String> evts = new ConcurrentLinkedDeque<>();
/** Auto-injected store session. */
@CacheStoreSessionResource
private CacheStoreSession ses;
/**
*
*/
public void clear() {
evts.clear();
}
/**
* @return Collection of recorded events.
*/
public Collection<String> events() {
return evts;
}
/** {@inheritDoc} */
@Override public void loadCache(IgniteBiInClosure<Object, Object> clo, @Nullable Object... args)
throws CacheLoaderException {
}
/** {@inheritDoc} */
@Override public void sessionEnd(boolean commit) throws CacheWriterException {
evts.offer("sessionEnd " + commit);
}
/** {@inheritDoc} */
@Override public Object load(Object key) throws CacheLoaderException {
return null;
}
/** {@inheritDoc} */
@Override public Map<Object, Object> loadAll(Iterable<?> keys) throws CacheLoaderException {
return null;
}
/** {@inheritDoc} */
@Override public void write(Cache.Entry<?, ?> entry) throws CacheWriterException {
CacheStoreSession ses = session();
String cacheName = ses.cacheName();
evts.add("write " + cacheName);
}
/** {@inheritDoc} */
@Override public void writeAll(Collection<Cache.Entry<?, ?>> entries) throws CacheWriterException {
String cacheName = session().cacheName();
evts.add("writeAll " + cacheName + " " + entries.size());
}
/** {@inheritDoc} */
@Override public void delete(Object key) throws CacheWriterException {
String cacheName = session().cacheName();
evts.add("delete " + cacheName);
}
/** {@inheritDoc} */
@Override public void deleteAll(Collection<?> keys) throws CacheWriterException {
String cacheName = session().cacheName();
evts.add("deleteAll " + cacheName + " " + keys.size());
}
/**
* @return Store session.
*/
private CacheStoreSession session() {
return ses;
}
}
/**
*
*/
private static class FirstStoreFactory implements Factory<CacheStore> {
/** */
@IgniteInstanceResource
private Ignite ignite;
/** {@inheritDoc} */
@Override public synchronized CacheStore create() {
String igniteInstanceName = ignite.name();
return firstStores.computeIfAbsent(igniteInstanceName, (k) -> new TestStore());
}
}
/**
*
*/
private static class SecondStoreFactory implements Factory<CacheStore> {
/** */
@IgniteInstanceResource
private Ignite ignite;
/** {@inheritDoc} */
@Override public CacheStore create() {
String igniteInstanceName = ignite.name();
return secondStores.computeIfAbsent(igniteInstanceName, (k) -> new TestStore());
}
}
}