blob: e8a0176154813110f5c1825af98448b02c9f8105 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.database;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.cache.persistence.DataStructure;
import org.junit.Test;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
/**
* Base class for memory leaks tests.
*/
public abstract class IgniteDbMemoryLeakAbstractTest extends IgniteDbAbstractTest {
/** */
private static final int CONCURRENCY_LEVEL = 16;
/** */
private static final int MIN_PAGE_CACHE_SIZE = 1048576 * CONCURRENCY_LEVEL;
/** */
private volatile Exception ex;
/** */
private long warmUpEndTime;
/** */
private long endTime;
/** */
private long loadedPages;
/** */
private long delta;
/** */
private long probeCnt;
/** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
DataStructure.rnd = null;
long startTime = System.nanoTime();
warmUpEndTime = startTime + TimeUnit.SECONDS.toNanos(warmUp());
endTime = warmUpEndTime + TimeUnit.SECONDS.toNanos(duration());
}
/** {@inheritDoc} */
@Override protected void configure(IgniteConfiguration cfg) {
cfg.setMetricsLogFrequency(5000);
}
/** {@inheritDoc} */
@Override protected void configure(DataStorageConfiguration mCfg) {
mCfg.setConcurrencyLevel(CONCURRENCY_LEVEL);
long size = (1024 * (isLargePage() ? 16 : 4) + 24) * pagesMax();
mCfg.setDefaultDataRegionConfiguration(
new DataRegionConfiguration().setMaxSize(Math.max(size, MIN_PAGE_CACHE_SIZE)).setName("default"));
}
/**
* @return Test duration in seconds.
*/
protected int duration() {
return getInteger("IGNITE_MEMORY_LEAKS_TEST_DURATION", 300);
}
/**
* @return Warm up duration in seconds.
*/
@SuppressWarnings("WeakerAccess")
protected int warmUp() {
return getInteger("IGNITE_MEMORY_LEAKS_TEST_WARM_UP", 450);
}
/** {@inheritDoc} */
@Override protected int gridCount() {
return 1;
}
/** {@inheritDoc} */
@Override protected boolean indexingEnabled() {
return false;
}
/** {@inheritDoc} */
@Override protected long getTestTimeout() {
return (warmUp() + duration() + 10) * 1000; // Extra seconds to stop all threads.
}
/**
* @param ig Ignite instance.
* @return IgniteCache.
*/
protected abstract IgniteCache<Object, Object> cache(IgniteEx ig);
/**
* @return Cache key to perform an operation.
*/
protected abstract Object key();
/**
* @param key Cache key to perform an operation.
* @return Cache value to perform an operation.
*/
protected abstract Object value(Object key);
/**
* @param cache IgniteCache.
*/
protected void operation(IgniteCache<Object, Object> cache) {
Object key = key();
Object val = value(key);
switch (nextInt(3)) {
case 0:
cache.getAndPut(key, val);
break;
case 1:
cache.get(key);
break;
case 2:
cache.getAndRemove(key);
}
}
/**
* @param bound Upper bound (exclusive). Must be positive.
* @return Random int value.
*/
protected static int nextInt(int bound) {
return ThreadLocalRandom.current().nextInt(bound);
}
/**
* @return Random int value.
*/
protected static int nextInt() {
return ThreadLocalRandom.current().nextInt();
}
/**
* @throws Exception If failed.
*/
@Test
public void testMemoryLeak() throws Exception {
final IgniteEx ignite = grid(0);
final IgniteCache<Object, Object> cache = cache(ignite);
Runnable target = new Runnable() {
@Override public void run() {
while (ex == null && System.nanoTime() < endTime) {
try {
operation(cache);
}
catch (Exception e) {
ex = e;
break;
}
}
}
};
Thread[] threads = new Thread[CONCURRENCY_LEVEL];
info("Warming up is started.");
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(target);
threads[i].start();
}
while (ex == null && System.nanoTime() < warmUpEndTime)
Thread.sleep(100);
if (ex != null)
throw ex;
info("Warming up is ended.");
while (ex == null && System.nanoTime() < endTime) {
try {
check(cache);
}
catch (Exception e) {
ex = e;
break;
}
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
}
if (ex != null)
throw ex;
}
/**
* Callback to check the current state.
*
* @param cache Cache instance.
* @throws Exception If failed.
*/
protected final void check(IgniteCache cache) throws Exception {
long pagesActual = ((IgniteCacheProxy)cache).context().dataRegion().pageMemory().loadedPages();
if (loadedPages > 0) {
delta += pagesActual - loadedPages;
int allowedDelta = pagesDelta();
if (probeCnt++ > 12) { // We need some statistic first. Minimal statistic is taken for a minute.
long actualDelta = delta / probeCnt;
assertTrue(
"Average growth pages in the number is more than expected [allowed=" + allowedDelta + ", actual=" + actualDelta + "]",
actualDelta <= allowedDelta);
}
}
long pagesAllowed = pagesMax();
assertTrue(
"Allocated pages count is more than expected [allowed=" + pagesAllowed + ", actual=" + pagesActual + "]",
pagesActual < pagesAllowed
);
loadedPages = pagesActual;
}
/**
* @return Maximal allowed pages number.
*/
protected abstract long pagesMax();
/**
* @return Expected average number of pages, on which their total number can grow per 5 seconds.
*/
@SuppressWarnings("WeakerAccess")
protected int pagesDelta() {
return 3;
}
}