blob: 39dcd310a99d8ccdeeb6059585b384aa1ac66d41 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.spi.metric.LongMetric;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.MvccFeatureChecker;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Assume;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.cacheMetricsRegistryName;
/**
*
*/
public class CacheRemoveWithTombstonesLoadTest extends GridCommonAbstractTest {
/** */
private boolean persistence;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
DataStorageConfiguration dsCfg = new DataStorageConfiguration();
if (persistence) {
dsCfg.setDefaultDataRegionConfiguration(
new DataRegionConfiguration().setMaxSize(100L * 1024 * 1024).setPersistenceEnabled(true))
.setWalMode(WALMode.LOG_ONLY);
}
dsCfg.setPageSize(1024);
cfg.setDataStorageConfiguration(dsCfg);
return cfg;
}
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
super.beforeTestsStarted();
Assume.assumeFalse(MvccFeatureChecker.forcedMvcc());
}
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
cleanPersistenceDir();
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
cleanPersistenceDir();
super.afterTest();
}
/**
* @throws Exception If failed.
*/
@Test
public void testRemoveAndRebalance() throws Exception {
removeAndRebalance();
}
/**
* @throws Exception If failed.
*/
@Test
public void testRemoveAndRebalanceWithPersistence() throws Exception {
persistence = true;
testRemoveAndRebalance();
}
/**
* @throws Exception If failed.
*/
private void removeAndRebalance() throws Exception {
IgniteEx ignite0 = startGrid(0);
if (persistence)
ignite0.cluster().active(true);
final int pageSize = ignite0.configuration().getDataStorageConfiguration().getPageSize();
assert pageSize > 0;
IgniteCache<TestKey, TestValue> cache0 = ignite0.createCache(cacheConfiguration());
ThreadLocalRandom rnd = ThreadLocalRandom.current();
List<TestKey> keys = new ArrayList<>();
Map<TestKey, TestValue> data = new HashMap<>();
final int KEYS = persistence ? 5_000 : 10_000;
final int ADD_NODES = persistence ? 2 : 3;
for (int i = 0; i < KEYS; i++) {
TestKey key = new TestKey(i, new byte[rnd.nextInt(pageSize * 3)]);
keys.add(key);
data.put(key, new TestValue(new byte[rnd.nextInt(pageSize * 3)]));
}
cache0.putAll(data);
AtomicInteger nodeIdx = new AtomicInteger();
for (int iter = 0; iter < ADD_NODES; iter++) {
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
int idx = nodeIdx.incrementAndGet();
info("Start node: " + idx);
return startGrid(idx);
}
});
long endTime = System.currentTimeMillis() + 2500;
while (System.currentTimeMillis() < endTime) {
for (int i = 0; i < 100; i++) {
TestKey key = keys.get(rnd.nextInt(keys.size()));
if (rnd.nextBoolean()) {
cache0.remove(key);
data.remove(key);
} else {
TestValue val = new TestValue(new byte[rnd.nextInt(pageSize * 3)]);
cache0.put(key, val);
data.put(key, val);
}
Thread.sleep(10);
}
}
fut.get(30_000);
checkData(keys, data);
waitTombstoneCleanup();
checkData(keys, data);
}
awaitPartitionMapExchange();
for (int iter = 0; iter < ADD_NODES; iter++) {
IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Object>() {
@Override public Object call() throws Exception {
int idx = nodeIdx.getAndDecrement();
info("Stop node: " + idx);
stopGrid(idx);
awaitPartitionMapExchange();
return null;
}
});
long endTime = System.currentTimeMillis() + 2500;
while (System.currentTimeMillis() < endTime) {
for (int i = 0; i < 100; i++) {
TestKey key = keys.get(rnd.nextInt(keys.size()));
if (rnd.nextBoolean()) {
cache0.remove(key);
data.remove(key);
} else {
TestValue val = new TestValue(new byte[rnd.nextInt(pageSize * 3)]);
cache0.put(key, val);
data.put(key, val);
}
}
Thread.sleep(10);
}
fut.get(30_000);
checkData(keys, data);
waitTombstoneCleanup();
checkData(keys, data);
}
}
/**
* @param keys Keys to check.
* @param data Expected data.
*/
private void checkData(List<TestKey> keys, Map<TestKey, TestValue> data) {
for (Ignite node : Ignition.allGrids()) {
if (!node.name().endsWith("CacheRemoveWithTombstonesLoadTest1"))
continue;
info("Check node: " + node.name());
IgniteCache<TestKey, TestValue> cache = node.cache(DEFAULT_CACHE_NAME);
for (TestKey key : keys) {
TestValue expVal = data.get(key);
TestValue val = cache.get(key);
if (expVal == null)
assertNull(val);
else {
assertNotNull(val);
assertTrue(Arrays.equals(expVal.dummyData, val.dummyData));
}
}
}
}
/**
* @throws Exception If failed.
*/
private void waitTombstoneCleanup() throws Exception {
for (Ignite node : Ignition.allGrids()) {
final LongMetric tombstones = ((IgniteEx)node).context().metric().registry(
cacheMetricsRegistryName(DEFAULT_CACHE_NAME, false)).findMetric("Tombstones");
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
return tombstones.value() == 0;
}
}, 30_000);
assertEquals("Failed to wait for tombstone cleanup: " + node.name(), 0, tombstones.value());
}
}
/**
* @return Cache configuration.
*/
private CacheConfiguration<TestKey, TestValue> cacheConfiguration() {
CacheConfiguration<TestKey, TestValue> ccfg = new CacheConfiguration<>(DEFAULT_CACHE_NAME);
ccfg.setAtomicityMode(TRANSACTIONAL);
ccfg.setCacheMode(PARTITIONED);
ccfg.setBackups(1);
ccfg.setRebalanceMode(ASYNC);
ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
return ccfg;
}
/**
*
*/
static class TestKey {
/** */
private final int id;
/** */
private final byte[] dummyData;
/**
* @param id ID.
* @param dummyData Dummy byte array (to test with various key sizes).
*/
public TestKey(int id, byte[] dummyData) {
this.id = id;
this.dummyData = dummyData;
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
TestKey testKey = (TestKey) o;
return id == testKey.id && Arrays.equals(dummyData, testKey.dummyData);
}
/** {@inheritDoc} */
@Override public int hashCode() {
int result = Objects.hash(id);
result = 31 * result + Arrays.hashCode(dummyData);
return result;
}
/** {@inheritDoc} */
@Override public String toString() {
return "TestKey [id=" + id + "]";
}
}
/**
*
*/
static class TestValue {
/** */
private final byte[] dummyData;
/**
* @param dummyData Dummy byte array (to test with various value sizes).
*/
public TestValue(byte[] dummyData) {
this.dummyData = dummyData;
}
}
}