blob: 5495037795aa3d59152f3b5e0304a4d992a155cd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.warmup;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.warmup.LoadAllWarmUpStrategy.LoadPartition;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static java.util.Collections.emptyList;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toMap;
/**
* Test class for testing {@link LoadAllWarmUpStrategy}.
*/
public class LoadAllWarmUpStrategySelfTest extends GridCommonAbstractTest {
/** Flag for enabling warm-up. */
private boolean warmUp;
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
stopAllGrids();
cleanPersistenceDir();
LoadAllWarmUpStrategyEx.loadDataInfoCb = null;
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
.setPluginProviders(new WarmUpTestPluginProvider())
.setDataStorageConfiguration(
new DataStorageConfiguration()
.setDataRegionConfigurations(
new DataRegionConfiguration().setName("dr_0").setPersistenceEnabled(true)
.setWarmUpConfiguration(!warmUp ? null : new LoadAllWarmUpConfigurationEx()),
new DataRegionConfiguration().setName("dr_1").setPersistenceEnabled(true)
.setWarmUpConfiguration(!warmUp ? null : new LoadAllWarmUpConfigurationEx())
)
).setCacheConfiguration(
cacheCfg("c_0", "g_0", "dr_0", Organization.queryEntity()),
cacheCfg("c_1", "g_0", "dr_0", Person.queryEntity()),
cacheCfg("c_2", "g_1", "dr_1", Organization.queryEntity()),
cacheCfg("c_3", "g_1", "dr_1", Person.queryEntity())
);
}
/**
* Test checks that number of pages loaded is equal to number of pages warmed up.
* <p/>
* Steps:
* 1)Start a node with static and dynamic caches and fill them in;
* 2)Make a checkpoint and get number of pages loaded;
* 3)Restart node and get number of pages warmed up;
* 4)Check that number of loaded and warmed pages is equal;
*
* @throws Exception If failed.
*/
@Test
public void testSimple() throws Exception {
IgniteEx n = startGrid(0);
n.cluster().state(ClusterState.ACTIVE);
IgniteCache c4 = n.getOrCreateCache(cacheCfg("c_4", "g_2", "dr_0"));
for (int i = 0; i < 5_000; i++) {
n.cache("c_0").put("c_0" + i, new Organization(i, "c_0" + i));
n.cache("c_1").put("c_1" + i, new Person(i, "c_1" + i, i));
n.cache("c_2").put("c_2" + i, new Organization(i, "c_2" + i));
n.cache("c_3").put("c_3" + i, new Person(i, "c_3" + i, i));
c4.put("c_4" + i, ThreadLocalRandom.current().nextInt());
}
forceCheckpoint();
Map<String, Long> expLoadedPages = loadedeDataRegionPages(n);
stopAllGrids();
warmUp = true;
n = startGrid(0);
Map<String, Long> actLoadedPages = loadedeDataRegionPages(n);
assertEquals(expLoadedPages.size(), actLoadedPages.size());
expLoadedPages.forEach((regName, loadedPages) -> {
assertTrue(regName, actLoadedPages.containsKey(regName));
assertEquals(regName, loadedPages, actLoadedPages.get(regName));
});
}
/**
* Test checks that if memory is less than pds, not all pages in pds will warm-up.
* There may be evictions during warm-up, so count of pages loaded is not maximum.
* <p/>
* Steps:
* 1)Start node and fill it with data for first data region until it is 2 * {@code MIN_PAGE_MEMORY_SIZE};
* 2)Make a checkpoint;
* 3)Restart node with warm-up, change maximum data region size to {@code MIN_PAGE_MEMORY_SIZE},
* and listen for {@link LoadAllWarmUpStrategyEx#loadDataInfo};
* 4)Check that estimated count of pages to warm-up is between maximum and
* approximate minimum count of pages to load;
* 5)Checking that total count of pages loaded is between maximum and
* approximate minimum count of pages to load.
*
* Approximate value due to fact that there are already loaded pages at
* beginning of warm-up, as well as evictions occur during warm-up.
*
* @throws Exception If failed.
*/
@Test
public void testMemoryLessPds() throws Exception {
IgniteEx n = startGrid(0);
n.cluster().state(ClusterState.ACTIVE);
int i = 0;
final long minMemSize = U.field(IgniteCacheDatabaseSharedManager.class, "MIN_PAGE_MEMORY_SIZE");
DataRegion dr_0 = n.context().cache().context().database().dataRegion("dr_0");
while (dr_0.pageMemory().loadedPages() * dr_0.pageMemory().systemPageSize() < 2 * minMemSize) {
n.cache("c_0").put("c_0" + i, new Organization(i, "c_0" + i));
n.cache("c_1").put("c_1" + i, new Person(i, "c_1" + i, i));
i++;
}
forceCheckpoint();
stopAllGrids();
warmUp = true;
IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(0));
cfg.getDataStorageConfiguration().getDataRegionConfigurations()[0].setMaxSize(minMemSize);
Map<String, Map<CacheGroupContext, List<LoadPartition>>> loadDataInfoMap = new ConcurrentHashMap<>();
LoadAllWarmUpStrategyEx.loadDataInfoCb = loadDataInfoMap::put;
n = startGrid(cfg);
dr_0 = n.context().cache().context().database().dataRegion("dr_0");
long warmUpPageCnt =
loadDataInfoMap.get("dr_0").values().stream().flatMap(Collection::stream).mapToLong(LoadPartition::pages).sum();
long maxLoadPages = minMemSize / dr_0.pageMemory().systemPageSize();
long minLoadPages = maxLoadPages - 100;
long loadPages = dr_0.pageMemory().loadedPages();
// There are loaded pages before warm-up.
assertTrue(warmUpPageCnt >= minLoadPages && warmUpPageCnt <= maxLoadPages);
// Pages may be evicted.
assertTrue(loadPages >= minLoadPages && loadPages <= maxLoadPages);
}
/**
* Create cache configuration.
*
* @param name Cache name.
* @param grpName Cache group name.
* @param regName Data region name.
* @param qryEntities Query entities.
* @return New cache configuration.
*/
private CacheConfiguration cacheCfg(String name, String grpName, String regName, QueryEntity... qryEntities) {
requireNonNull(name);
requireNonNull(grpName);
requireNonNull(regName);
return new CacheConfiguration(name)
.setGroupName(grpName)
.setDataRegionName(regName)
.setAffinity(new GapRendezvousAffinityFunction(false, 5))
.setQueryEntities(Arrays.asList(qryEntities));
}
/**
* Counting of loaded pages for data regions.
*
* @param n Node.
* @return Mapping: {dataRegionName -> loadedPageCnt}.
*/
private Map<String, Long> loadedeDataRegionPages(IgniteEx n) {
requireNonNull(n);
return n.context().cache().cacheGroups().stream()
.filter(grpCtx -> grpCtx.userCache() && grpCtx.persistenceEnabled())
// Check for exists gap in local partitions.
.peek(grpCtx -> assertTrue(grpCtx.topology().localPartitions().size() < grpCtx.topology().partitions()))
.map(CacheGroupContext::dataRegion)
.distinct()
.collect(toMap(region -> region.config().getName(), region -> region.pageMemory().loadedPages()));
}
/**
* {@link RendezvousAffinityFunction} for presence of a gap partition.
*/
private static class GapRendezvousAffinityFunction extends RendezvousAffinityFunction {
/** Gap partition id. */
public static final int GAP_PART = 2;
/**
* Constructor that invoke {@link RendezvousAffinityFunction#RendezvousAffinityFunction(boolean, int)}.
*/
public GapRendezvousAffinityFunction(boolean exclNeighbors, int parts) {
super(exclNeighbors, parts);
assert parts > GAP_PART : parts;
}
/** {@inheritDoc} */
@Override public List<ClusterNode> assignPartition(
int part,
List<ClusterNode> nodes,
int backups,
@Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache
) {
return part == GAP_PART ? emptyList() : super.assignPartition(part, nodes, backups, neighborhoodCache);
}
/** {@inheritDoc} */
@Override public int partition(Object key) {
int part = super.partition(key);
return part == GAP_PART ? GAP_PART - 1 : part;
}
}
}