blob: 1bb309fa5ccd4411d3e4647c7ecda598b2d92c0e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.affinity;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.apache.ignite.testframework.junits.common.GridCommonTest;
import org.junit.Test;
/**
* Tests for {@link GridAffinityProcessor}.
*/
@GridCommonTest(group = "Affinity Processor")
public class GridAffinityProcessorMemoryLeakTest extends GridCommonAbstractTest {
/** Cache name. */
private static final String CACHE_NAME = "cache";
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
((TcpDiscoverySpi)cfg.getDiscoverySpi()).setForceServerMode(true);
CacheConfiguration cacheCfg = defaultCacheConfiguration();
cacheCfg.setName(CACHE_NAME);
cacheCfg.setStoreKeepBinary(true);
cacheCfg.setCacheMode(CacheMode.LOCAL);
cfg.setCacheConfiguration(cacheCfg);
return cfg;
}
/** {@inheritDoc} */
@Override protected void afterTestsStopped() throws Exception {
stopAllGrids();
}
/**
* Test affinity functions caching and clean up.
*
* @throws Exception In case of any exception.
*/
@Test
@WithSystemProperty(key = IgniteSystemProperties.IGNITE_AFFINITY_HISTORY_SIZE, value = "10")
public void testAffinityProcessor() throws Exception {
int maxHistSize = 10;
Ignite ignite = startGrid(0);
IgniteKernal grid = (IgniteKernal)grid(0);
IgniteCache<String, String> cache;
IgniteCache<String, String> globalCache = getOrCreateGlobalCache(ignite);
IgniteDataStreamer<String, String> globalStreamer;
int cnt = maxHistSize * 30;
int expLimit = cnt / 2;
int size;
do {
try {
cache = createLocalCache(ignite, cnt);
cache.put("Key" + cnt, "Value" + cnt);
cache.destroy();
globalStreamer = createGlobalStreamer(ignite, globalCache);
globalStreamer.addData("GlobalKey" + cnt, "GlobalValue" + cnt);
globalStreamer.flush();
globalStreamer.close();
size = ((ConcurrentSkipListMap)GridTestUtils.getFieldValue(grid.context().affinity(), "affMap")).size();
assertTrue("Cache has size that bigger then expected [size=" + size +
", expLimit=" + expLimit + "]", size < expLimit);
}
catch (Exception e) {
fail("Error was handled [" + e.getMessage() + "]");
}
}
while (cnt-- > 0);
}
/**
* Creates global cache.
*
* @param ignite instance of {@code Ignite}.
* @param id unique id for local cache.
* @return local cache instance.
*/
private static IgniteCache<String, String> createLocalCache(Ignite ignite, long id) {
final String cacheName = "localCache" + id;
final CacheConfiguration<String, String> cCfg = new CacheConfiguration<>();
cCfg.setName(cacheName);
cCfg.setCacheMode(CacheMode.LOCAL);
cCfg.setGroupName("some group");
ignite.destroyCache(cacheName); // Local cache is not really local - reference can be kept by other nodes if restart during the load happens.
return ignite.createCache(cCfg).withKeepBinary();
}
/**
* Gets or creates global cache.
*
* @param ignite instance of {@code Ignite}.
* @return global cache instance.
*/
private static IgniteCache<String, String> getOrCreateGlobalCache(Ignite ignite) {
final String cacheName = "GlobalCache";
final CacheConfiguration<String, String> cCfg = new CacheConfiguration<>();
cCfg.setName(cacheName);
cCfg.setStoreKeepBinary(true);
cCfg.setCacheMode(CacheMode.PARTITIONED);
cCfg.setOnheapCacheEnabled(false);
cCfg.setCopyOnRead(false);
cCfg.setBackups(0);
cCfg.setWriteBehindEnabled(false);
cCfg.setReadThrough(false);
return ignite.getOrCreateCache(cCfg).withKeepBinary();
}
/**
* Creates streamer for global cache.
*
* @param ignite instance of {@code Ignite}.
* @param cache instance of global cache.
* @return instance of {@code IgniteDataStreamer}.
*/
private static IgniteDataStreamer<String, String> createGlobalStreamer(Ignite ignite,
IgniteCache<String, String> cache) {
IgniteDataStreamer<String, String> streamer = ignite.dataStreamer(cache.getName());
streamer.allowOverwrite(true);
streamer.skipStore(true);
streamer.keepBinary(false);
return streamer;
}
}