blob: d9a2340c83f90bb5324a08972f5320f7dfb9ce38 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.datastructures;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCluster;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSet;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.CollectionConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.junit.Test;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
/**
* Cache set tests.
*/
public abstract class GridCacheSetAbstractSelfTest extends IgniteCollectionAbstractTest {
/** */
protected static final String SET_NAME = "testSet";
/** {@inheritDoc} */
@Override protected int gridCount() {
return 4;
}
/** {@inheritDoc} */
@Override protected CollectionConfiguration collectionConfiguration() {
CollectionConfiguration colCfg = super.collectionConfiguration();
if (colCfg.getCacheMode() == PARTITIONED)
colCfg.setBackups(1);
return colCfg;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
IgniteSet<Object> set = grid(0).set(SET_NAME, null);
if (set != null)
set.close();
waitSetResourcesCleared();
assertNull(grid(0).set(SET_NAME, null));
super.afterTest();
}
/**
* Waits when internal set maps are cleared.
*
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("ErrorNotRethrown")
private void waitSetResourcesCleared() throws IgniteCheckedException {
final int MAX_CHECK = 5;
for (int i = 0; i < MAX_CHECK; i++) {
try {
assertSetResourcesCleared();
return;
}
catch (AssertionError e) {
if (i == MAX_CHECK - 1)
throw e;
log.info("Set resources not cleared, will wait more.");
U.sleep(1000);
}
}
}
/**
* Checks internal set maps are cleared.
*/
private void assertSetResourcesCleared() {
assertSetIteratorsCleared();
for (int i = 0; i < gridCount(); i++) {
IgniteKernal grid = (IgniteKernal)grid(i);
for (IgniteInternalCache cache : grid.cachesx(null)) {
CacheDataStructuresManager dsMgr = cache.context().dataStructures();
Map map = GridTestUtils.getFieldValue(dsMgr, "setsMap");
assertEquals("Set not removed [grid=" + i + ", map=" + map + ']', 0, map.size());
}
}
}
/**
* Checks internal iterators maps are cleared.
*/
private void assertSetIteratorsCleared() {
for (int i = 0; i < gridCount(); i++) {
IgniteKernal grid = (IgniteKernal)grid(i);
for (IgniteCache cache : grid.caches()) {
GridCacheQueryManager queries = grid.internalCache(cache.getName()).context().queries();
Map map = GridTestUtils.getFieldValue(queries, GridCacheQueryManager.class, "qryIters");
for (Object obj : map.values())
assertEquals("Iterators not removed for grid " + i, 0, ((Map)obj).size());
}
}
}
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return 2 * 60 * 1000;
}
/**
* @throws Exception If failed.
*/
@Test
public void testCreateRemove() throws Exception {
testCreateRemove(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCreateRemoveCollocated() throws Exception {
testCreateRemove(true);
}
/**
* @param collocated Collocation flag.
* @throws Exception If failed.
*/
protected void testCreateRemove(boolean collocated) throws Exception {
testCreateRemove(collocated, 0);
}
/**
* @param collocated Collocation flag.
* @param nodeIdx Index of the node from which to create set.
* @throws Exception If failed.
*/
protected void testCreateRemove(boolean collocated, int nodeIdx) throws Exception {
for (int i = 0; i < gridCount(); i++)
assertNull(grid(i).set(SET_NAME, null));
CollectionConfiguration colCfg0 = config(collocated);
IgniteSet<Integer> set0 = grid(nodeIdx).set(SET_NAME, colCfg0);
assertNotNull(set0);
for (int i = 0; i < gridCount(); i++) {
CollectionConfiguration colCfg = config(collocated);
IgniteSet<Integer> set = grid(i).set(SET_NAME, colCfg);
assertNotNull(set);
assertTrue(set.isEmpty());
assertEquals(0, set.size());
assertEquals(SET_NAME, set.name());
if (collectionCacheMode() == PARTITIONED)
assertEquals(collocated, set.collocated());
}
set0.close();
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
try {
for (int i = 0; i < gridCount(); i++) {
if (grid(i).set(SET_NAME, null) != null)
return false;
}
return true;
}
catch (Exception e) {
fail("Unexpected exception: " + e);
return true;
}
}
}, 1000);
for (int i = 0; i < gridCount(); i++)
assertNull(grid(i).set(SET_NAME, null));
}
/**
* @throws Exception If failed.
*/
@Test
public void testApi() throws Exception {
testApi(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testApiCollocated() throws Exception {
testApi(true);
}
/**
* @param collocated Collocation flag.
* @throws Exception If failed.
*/
private void testApi(boolean collocated) throws Exception {
CollectionConfiguration colCfg = config(collocated);
IgniteSet<Object> set0 = grid(0).set(SET_NAME, colCfg);
assertNotNull(set0);
for (int i = 0; i < gridCount(); i++) {
Set<Integer> set = grid(i).set(SET_NAME, null);
assertNotNull(set);
assertFalse(set.contains(1));
assertEquals(0, set.size());
assertTrue(set.isEmpty());
}
// Add, isEmpty.
assertTrue(set0.add(1));
for (int i = 0; i < gridCount(); i++) {
Set<Integer> set = grid(i).set(SET_NAME, null);
assertEquals(1, set.size());
assertFalse(set.isEmpty());
assertTrue(set.contains(1));
assertFalse(set.add(1));
assertFalse(set.contains(100));
}
// Remove.
assertTrue(set0.remove(1));
for (int i = 0; i < gridCount(); i++) {
Set<Integer> set = grid(i).set(SET_NAME, null);
assertEquals(0, set.size());
assertTrue(set.isEmpty());
assertFalse(set.contains(1));
assertFalse(set.remove(1));
}
// Contains all.
Collection<Integer> col1 = new ArrayList<>();
Collection<Integer> col2 = new ArrayList<>();
final int ITEMS = 100;
for (int i = 0; i < ITEMS; i++) {
assertTrue(grid(i % gridCount()).set(SET_NAME, null).add(i));
col1.add(i);
col2.add(i);
}
col2.add(ITEMS);
for (int i = 0; i < gridCount(); i++) {
Set<Integer> set = grid(i).set(SET_NAME, null);
assertEquals(ITEMS, set.size());
assertTrue(set.containsAll(col1));
assertFalse(set.containsAll(col2));
}
// To array.
for (int i = 0; i < gridCount(); i++) {
Set<Integer> set = grid(i).set(SET_NAME, null);
assertArrayContent(set.toArray(), ITEMS);
assertArrayContent(set.toArray(new Integer[ITEMS]), ITEMS);
}
// Remove all.
Collection<Integer> rmvCol = new ArrayList<>();
for (int i = ITEMS - 10; i < ITEMS; i++)
rmvCol.add(i);
assertTrue(set0.removeAll(rmvCol));
for (int i = 0; i < gridCount(); i++) {
Set<Integer> set = grid(i).set(SET_NAME, null);
assertFalse(set.removeAll(rmvCol));
for (Integer val : rmvCol)
assertFalse(set.contains(val));
assertArrayContent(set.toArray(), ITEMS - 10);
assertArrayContent(set.toArray(new Integer[ITEMS - 10]), ITEMS - 10);
}
// Add all.
assertTrue(set0.addAll(rmvCol));
for (int i = 0; i < gridCount(); i++) {
Set<Integer> set = grid(i).set(SET_NAME, null);
assertEquals(ITEMS, set.size());
assertFalse(set.addAll(rmvCol));
for (Integer val : rmvCol)
assertTrue(set.contains(val));
}
// Retain all.
assertTrue(set0.retainAll(rmvCol));
for (int i = 0; i < gridCount(); i++) {
Set<Integer> set = grid(i).set(SET_NAME, null);
assertEquals(rmvCol.size(), set.size());
assertFalse(set.retainAll(rmvCol));
for (int val = 0; val < 10; val++)
assertFalse(set.contains(val));
for (int val : rmvCol)
assertTrue(set.contains(val));
}
assertFalse(set0.isEmpty());
// retainAll with empty list: clear the collection and get a boolean value indicating if it was empty or not.
assertTrue(set0.retainAll(new ArrayList<>()));
assertTrue(set0.isEmpty());
// Clear.
set0.add(1);
set0.clear();
for (int i = 0; i < gridCount(); i++) {
Set<Integer> set = grid(i).set(SET_NAME, null);
assertEquals(0, set.size());
assertTrue(set.isEmpty());
assertFalse(set.contains(0));
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testIterator() throws Exception {
testIterator(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testIteratorCollocated() throws Exception {
testIterator(true);
}
/**
* @param collocated Collocation flag.
* @throws Exception If failed.
*/
protected void testIterator(boolean collocated) throws Exception {
testIterator(collocated, 0);
}
/**
* @param collocated Collocation flag.
* @param nodeIdx Index of the node from which to create set.
* @throws Exception If failed.
*/
protected void testIterator(boolean collocated, int nodeIdx) throws Exception {
CollectionConfiguration colCfg = config(collocated);
final IgniteSet<Integer> set0 = grid(nodeIdx).set(SET_NAME, colCfg);
for (int i = 0; i < gridCount(); i++) {
IgniteSet<Integer> set = grid(i).set(SET_NAME, null);
assertFalse(set.iterator().hasNext());
}
int cnt = 0;
for (int i = 0; i < gridCount(); i++) {
Set<Integer> set = grid(i).set(SET_NAME, null);
for (int j = 0; j < 100; j++)
assertTrue(set.add(cnt++));
}
for (int i = 0; i < gridCount(); i++) {
IgniteSet<Integer> set = grid(i).set(SET_NAME, null);
assertSetContent(set, cnt);
}
// Try to do not use hasNext.
Collection<Integer> data = new HashSet<>(cnt);
Iterator<Integer> iter = set0.iterator();
for (int i = 0; i < cnt; i++)
assertTrue(data.add(iter.next()));
assertFalse(iter.hasNext());
assertEquals(cnt, data.size());
for (int i = 0; i < cnt; i++)
assertTrue(data.contains(i));
// Iterator for empty set.
set0.clear();
for (int i = 0; i < gridCount(); i++) {
IgniteSet<Integer> set = grid(i).set(SET_NAME, null);
assertFalse(set.iterator().hasNext());
}
// Iterator.remove().
for (int i = 0; i < 10; i++)
assertTrue(set0.add(i));
iter = set0.iterator();
while (iter.hasNext()) {
Integer val = iter.next();
if (val % 2 == 0)
iter.remove();
}
for (int i = 0; i < gridCount(); i++) {
Set<Integer> set = grid(i).set(SET_NAME, null);
assertEquals(i % 2 != 0, set.contains(i));
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testIteratorClose() throws Exception {
testIteratorClose(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testIteratorCloseCollocated() throws Exception {
testIteratorClose(true);
}
/**
* @param collocated Collocation flag.
* @throws Exception If failed.
*/
@SuppressWarnings("ErrorNotRethrown")
private void testIteratorClose(boolean collocated) throws Exception {
CollectionConfiguration colCfg = config(collocated);
IgniteSet<Integer> set0 = grid(0).set(SET_NAME, colCfg);
for (int i = 0; i < 5000; i++)
assertTrue(set0.add(i));
createIterators(set0);
System.gc();
for (int i = 0; i < 10; i++) {
try {
set0.size(); // Trigger weak queue poll.
assertSetIteratorsCleared();
}
catch (AssertionError e) {
if (i == 9)
throw e;
log.info("Set iterators not cleared, will wait");
Thread.sleep(500);
}
}
// Check iterators are closed on set remove.
createIterators(set0);
int idx = gridCount() > 1 ? 1 : 0;
grid(idx).set(SET_NAME, null).close();
for (int i = 0; i < 10; i++) {
try {
assertSetIteratorsCleared();
}
catch (AssertionError e) {
if (i == 9)
throw e;
log.info("Set iterators not cleared, will wait");
Thread.sleep(500);
}
}
}
/**
* @param set Set.
*/
private void createIterators(IgniteSet<Integer> set) {
for (int i = 0; i < 10; i++) {
Iterator<Integer> iter = set.iterator();
assertTrue(iter.hasNext());
iter.next();
assertTrue(iter.hasNext());
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testNodeJoinsAndLeaves() throws Exception {
testNodeJoinsAndLeaves(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testNodeJoinsAndLeavesCollocated() throws Exception {
testNodeJoinsAndLeaves(true);
}
/**
* @param collocated Collocation flag.
* @throws Exception If failed.
*/
private void testNodeJoinsAndLeaves(boolean collocated) throws Exception {
CollectionConfiguration colCfg = config(collocated);
Set<Integer> set0 = grid(0).set(SET_NAME, colCfg);
final int ITEMS = 10_000;
for (int i = 0; i < ITEMS; i++)
set0.add(i);
startGrid(gridCount());
awaitPartitionMapExchange();
try {
IgniteSet<Integer> set1 = grid(0).set(SET_NAME, null);
assertNotNull(set1);
for (int i = 0; i < gridCount() + 1; i++) {
IgniteSet<Integer> set = grid(i).set(SET_NAME, null);
assertEquals(ITEMS, set.size());
assertSetContent(set, ITEMS);
}
}
finally {
stopGrid(gridCount());
}
// Stopping a node will cause data loss with zero backups.
if (colCfg.getBackups() != 0) {
for (int i = 0; i < gridCount(); i++) {
IgniteSet<Integer> set = grid(i).set(SET_NAME, null);
assertSetContent(set, ITEMS);
}
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testMultithreaded() throws Exception {
testMultithreaded(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testMultithreadedCollocated() throws Exception {
if (collectionCacheMode() != PARTITIONED)
return;
testMultithreaded(true);
}
/**
* @param collocated Collocation flag.
* @throws Exception If failed.
*/
private void testMultithreaded(final boolean collocated) throws Exception {
CollectionConfiguration colCfg = config(collocated);
Set<Integer> set0 = grid(0).set(SET_NAME, colCfg);
assertNotNull(set0);
Collection<IgniteInternalFuture> futs = new ArrayList<>();
final int THREADS_PER_NODE = 5;
final int KEY_RANGE = 10_000;
final int ITERATIONS = GridTestUtils.SF.applyLB(2000, 100);
for (int i = 0; i < gridCount(); i++) {
final int idx = i;
futs.add(GridTestUtils.runMultiThreadedAsync(new Callable<Void>() {
@Override public Void call() throws Exception {
IgniteSet<Integer> set = grid(idx).set(SET_NAME, null);
assertNotNull(set);
ThreadLocalRandom rnd = ThreadLocalRandom.current();
for (int i = 0; i < ITERATIONS; i++) {
switch (rnd.nextInt(4)) {
case 0:
set.add(rnd.nextInt(KEY_RANGE));
break;
case 1:
set.remove(rnd.nextInt(KEY_RANGE));
break;
case 2:
set.contains(rnd.nextInt(KEY_RANGE));
break;
case 3:
for (Integer val : set)
assertNotNull(val);
break;
default:
fail();
}
if ((i + 1) % 500 == 0)
log.info("Executed iterations: " + (i + 1));
}
return null;
}
}, THREADS_PER_NODE, "testSetMultithreaded"));
}
for (IgniteInternalFuture fut : futs)
fut.get();
}
/**
* @throws Exception If failed.
*/
@Test
public void testCleanup() throws Exception {
testCleanup(false);
}
/**
* @throws Exception If failed.
*/
@Test
public void testCleanupCollocated() throws Exception {
testCleanup(true);
}
/**
* @param collocated Collocation flag.
* @throws Exception If failed.
*/
private void testCleanup(boolean collocated) throws Exception {
CollectionConfiguration colCfg = config(collocated);
final IgniteSet<Integer> set0 = grid(0).set(SET_NAME, colCfg);
assertNotNull(set0);
final Collection<Set<Integer>> sets = new ArrayList<>();
for (int i = 0; i < gridCount(); i++) {
IgniteSet<Integer> set = grid(i).set(SET_NAME, null);
assertNotNull(set);
sets.add(set);
}
Collection<Integer> items = new ArrayList<>(10_000);
for (int i = 0; i < 10_000; i++)
items.add(i);
set0.addAll(items);
assertEquals(10_000, set0.size());
final AtomicBoolean stop = new AtomicBoolean();
final AtomicInteger val = new AtomicInteger(10_000);
IgniteInternalFuture<?> fut;
try {
fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
try {
while (!stop.get()) {
for (Set<Integer> set : sets)
set.add(val.incrementAndGet());
}
}
catch (IllegalStateException e) {
log.info("Set removed: " + e);
}
return null;
}
}, 5, "set-add-thread");
set0.close();
}
finally {
stop.set(true);
}
fut.get();
int cnt = 0;
GridCacheContext cctx = GridTestUtils.getFieldValue(set0, "cctx");
boolean separated = separated(set0);
if (separated)
awaitPartitionMapExchange();
for (int i = 0; i < gridCount(); i++) {
GridCacheAdapter cache = grid(i).context().cache().internalCache(cctx.name());
if (separated) {
assertNull("Cache " + cctx.name() + " was not destroyed.", cache);
continue;
}
for (Object e : cache.localEntries(new CachePeekMode[]{CachePeekMode.ALL})) {
cnt++;
log.info("Unexpected entry: " + e);
}
}
assertEquals("Found unexpected cache entries", 0, cnt);
for (final Set<Integer> set : sets) {
GridTestUtils.assertThrows(log, new Callable<Void>() {
@Override public Void call() throws Exception {
set.add(10);
return null;
}
}, IllegalStateException.class, null);
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testSerialization() throws Exception {
final IgniteSet<Integer> set = grid(0).set(SET_NAME, config(false));
assertNotNull(set);
for (int i = 0; i < 10; i++)
set.add(i);
IgniteCluster cluster = grid(0).cluster();
Collection<Integer> c = grid(0).compute(cluster).broadcast(new IgniteCallable<Integer>() {
@Override public Integer call() throws Exception {
assertEquals(SET_NAME, set.name());
return set.size();
}
});
assertEquals(gridCount(), c.size());
for (Integer size : c)
assertEquals((Integer)10, size);
}
/**
* @throws Exception If failed.
*/
@Test
public void testAffinityRun() throws Exception {
final CollectionConfiguration colCfg = collectionConfiguration();
colCfg.setCollocated(false);
colCfg.setCacheMode(CacheMode.PARTITIONED);
colCfg.setGroupName("testGroup");
try (final IgniteSet<Integer> set1 = grid(0).set("Set1", colCfg)) {
GridTestUtils.assertThrows(
log,
new Callable<Void>() {
@Override public Void call() throws Exception {
set1.affinityRun(new IgniteRunnable() {
@Override public void run() {
// No-op.
}
});
return null;
}
},
IgniteException.class,
"Failed to execute affinityRun() for non-collocated set: " + set1.name() +
". This operation is supported only for collocated sets.");
}
colCfg.setCollocated(true);
try (final IgniteSet<Integer> set2 = grid(0).set("Set2", colCfg)) {
set2.add(100);
final String cacheName = cctx(set2).name();
set2.affinityRun(new IgniteRunnable() {
@IgniteInstanceResource
private IgniteEx ignite;
@Override public void run() {
assertTrue(ignite.cachex(cacheName).affinity().isPrimaryOrBackup(
ignite.cluster().localNode(), "Set2"));
assertEquals(100, set2.iterator().next().intValue());
}
});
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testAffinityCall() throws Exception {
final CollectionConfiguration colCfg = collectionConfiguration();
colCfg.setCollocated(false);
colCfg.setCacheMode(CacheMode.PARTITIONED);
colCfg.setGroupName("testGroup");
try (final IgniteSet<Integer> set1 = grid(0).set("Set1", colCfg)) {
GridTestUtils.assertThrows(
log,
new Callable<Void>() {
@Override public Void call() throws Exception {
set1.affinityCall(new IgniteCallable<Object>() {
@Override public Object call() {
return null;
}
});
return null;
}
},
IgniteException.class,
"Failed to execute affinityCall() for non-collocated set: " + set1.name() +
". This operation is supported only for collocated sets.");
}
colCfg.setCollocated(true);
try (final IgniteSet<Integer> set2 = grid(0).set("Set2", colCfg)) {
set2.add(100);
final String cacheName = cctx(set2).name();
Integer res = set2.affinityCall(new IgniteCallable<Integer>() {
@IgniteInstanceResource
private IgniteEx ignite;
@Override public Integer call() {
assertTrue(ignite.cachex(cacheName).affinity().isPrimaryOrBackup(
ignite.cluster().localNode(), "Set2"));
return set2.iterator().next();
}
});
assertEquals(100, res.intValue());
}
}
/**
* Implementation of ignite data structures internally uses special system caches, need make sure
* that transaction on these system caches do not intersect with transactions started by user.
*
* @throws Exception If failed.
*/
@Test
public void testIsolation() throws Exception {
CollectionConfiguration colCfg = collectionConfiguration();
Ignite ignite = grid(0);
CacheConfiguration cfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
cfg.setName("myCache");
cfg.setAtomicityMode(TRANSACTIONAL);
cfg.setWriteSynchronizationMode(FULL_SYNC);
IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cfg);
try {
IgniteSet<Integer> set0 = ignite.set(SET_NAME, colCfg);
assertNotNull(set0);
try (Transaction tx = ignite.transactions().txStart()) {
cache.put(1, 1);
Collection<Integer> items = new ArrayList<>(100);
for (int i = 0; i < 100; i++)
items.add(i);
set0.addAll(items);
tx.rollback();
}
assertEquals(0, cache.size());
assertEquals(100, set0.size());
set0.close();
}
finally {
ignite.destroyCache(cfg.getName());
}
}
/**
* Test that non collocated sets are stored in a separated cache.
*/
@Test
public void testCacheReuse() {
testCacheReuse(false);
}
/**
* Test that collocated sets within the same group and compatible configurations are stored in the same cache.
*/
@Test
public void testCacheReuseCollocated() {
testCacheReuse(true);
}
/**
* @param collocated Collocation flag.
*/
private void testCacheReuse(boolean collocated) {
Ignite ignite = grid(0);
CollectionConfiguration colCfg = collectionConfiguration().setCollocated(collocated);
colCfg.setAtomicityMode(ATOMIC);
colCfg.setGroupName("grp1");
IgniteSet set1 = ignite.set("set1", colCfg);
IgniteSet set2 = ignite.set("set2", colCfg);
assertEquals(separated(set1), cctx(set1).cacheId() != cctx(set2).cacheId());
colCfg.setAtomicityMode(TRANSACTIONAL);
IgniteSet set3 = ignite.set("set3", colCfg);
IgniteSet set4 = ignite.set("set4", colCfg);
assertEquals(separated(set3), cctx(set3).cacheId() != cctx(set4).cacheId());
assertTrue(cctx(set1).cacheId() != cctx(set3).cacheId());
assertTrue(cctx(set1).groupId() == cctx(set3).groupId());
colCfg.setGroupName("gtp2");
IgniteSet set5 = ignite.set("set5", colCfg);
IgniteSet set6 = ignite.set("set6", colCfg);
assertEquals(separated(set5), cctx(set5).cacheId() != cctx(set6).cacheId());
assertTrue(cctx(set1).groupId() != cctx(set5).groupId());
Stream.of(set1, set2, set3, set4, set5, set6).forEach(IgniteSet::close);
}
/**
* Tests that basic API works correctly when there are multiple structures in multiple groups.
*
* @throws Exception If failed.
*/
@Test
public void testMultipleStructuresInDifferentGroups() throws Exception {
Ignite ignite = grid(0);
CollectionConfiguration cfg1 = collectionConfiguration();
CollectionConfiguration cfg2 = collectionConfiguration().setGroupName("grp2");
IgniteSet<String> set1 = ignite.set("set1", cfg1);
IgniteSet<String> set2 = ignite.set("set2", cfg1);
IgniteSet<String> set3 = ignite.set("set3", cfg2);
IgniteSet<String> set4 = ignite.set("set4", cfg2);
assertTrue(set1.add("a"));
assertTrue(set2.add("b"));
assertTrue(set3.add("c"));
assertTrue(set4.add("d"));
assertFalse(set1.add("a"));
assertFalse(set2.add("b"));
assertFalse(set3.add("c"));
assertFalse(set4.add("d"));
assertTrue(set1.contains("a"));
assertTrue(set2.contains("b"));
assertTrue(set3.contains("c"));
assertTrue(set4.contains("d"));
assertEquals(1, set1.size());
assertEquals(1, set2.size());
assertEquals(1, set3.size());
assertEquals(1, set4.size());
assertFalse(set1.remove("z"));
assertFalse(set2.remove("z"));
assertFalse(set3.remove("z"));
assertFalse(set4.remove("z"));
assertTrue(set1.remove("a"));
assertTrue(set2.remove("b"));
assertTrue(set3.remove("c"));
assertTrue(set4.remove("d"));
assertTrue(set1.isEmpty());
assertTrue(set2.isEmpty());
assertTrue(set3.isEmpty());
assertTrue(set4.isEmpty());
set2.close();
set4.close();
assertTrue(set2.removed());
assertTrue(set4.removed());
assertFalse(set1.removed());
assertFalse(set3.removed());
assertNotNull(ignite.set("set1", null));
assertNull(ignite.set("set2", null));
set1.close();
set3.close();
}
/**
* Tests that new set with the same name as an old removed set does not contain old data.
*/
@Test
@SuppressWarnings("ThrowableNotThrown")
public void testCloseAndCreateWithSameName() {
Ignite ignite = grid(0);
CollectionConfiguration cfg = collectionConfiguration();
IgniteSet<Integer> oldSet = ignite.set("testRemoveAndCreateWithSameName", cfg);
IgniteSet<Integer> oldSet2 = ignite.set(oldSet.name(), null);
oldSet.add(1);
oldSet.close();
IgniteSet<Integer> newSet = ignite.set(oldSet.name(), cfg);
assertEquals(0, newSet.size());
assertTrue(oldSet.removed());
assertTrue(oldSet2.removed());
String msg = "Set has been removed from cache";
GridTestUtils.assertThrows(null, oldSet::size, IllegalStateException.class, msg);
GridTestUtils.assertThrows(null, oldSet2::size, IllegalStateException.class, msg);
newSet.close();
}
/**
* Tests multiple sets with the same name but different cache options.
*/
@Test
public void testSameNameDifferentOptions() {
Ignite ignite = grid(0);
String name = "testSameNameDifferentOptions";
CollectionConfiguration cfg1 = new CollectionConfiguration()
.setGroupName("gp1");
CollectionConfiguration cfg2 = new CollectionConfiguration()
.setGroupName("gp1")
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
IgniteSet<Integer> set1 = ignite.set(name, cfg1);
IgniteSet<Integer> set1_1 = ignite.set(name, cfg1);
IgniteSet<Integer> set2 = ignite.set(name, cfg2);
IgniteSet<Integer> set2_2 = ignite.set(name, cfg2);
set1.add(1);
assertEquals(1, set1.size());
assertEquals(1, set1_1.size());
assertEquals(0, set2.size());
assertEquals(0, set2_2.size());
set1.close();
set2.close();
}
/**
* @param set Set.
* @param size Expected size.
*/
private void assertSetContent(IgniteSet<Integer> set, int size) {
Collection<Integer> data = new HashSet<>(size);
for (Integer val : set)
assertTrue(data.add(val));
assertEquals(size, data.size());
for (int val = 0; val < size; val++)
assertTrue(data.contains(val));
}
/**
* @param arr Array.
* @param size Expected size.
*/
private void assertArrayContent(Object[] arr, int size) {
assertEquals(size, arr.length);
for (int i = 0; i < size; i++) {
boolean found = false;
for (Object obj : arr) {
if (obj.equals(i)) {
found = true;
break;
}
}
assertTrue(found);
}
}
}