blob: 304eea55bed4f651d87914d3950ac645452b7eed [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.rebalancing;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.util.lang.IgniteClosure2X;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.CallbackExecutorLogListener;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import static java.util.Arrays.asList;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
/**
* For testing of rebalance statistics.
*/
public class RebalanceStatisticsTest extends GridCommonAbstractTest {
/** Caches configurations. */
private CacheConfiguration[] cacheCfgs;
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
super.afterTest();
}
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
.setConsistentId(igniteInstanceName)
.setCacheConfiguration(cacheCfgs)
.setRebalanceThreadPoolSize(5)
.setCommunicationSpi(new TestRecordingCommunicationSpi());
}
/**
* Test statistics of a rebalance.
*
* Steps:
* 1)Creating and filling a cluster;
* 2)Starting a new node with listening for logs and supply messages;
* 3)Check that number of supply messages is equal to number of logs received +1;
* 4)Find corresponding message in log for each supply message;
* 5)Find log message after all of groups and to check its correctness.
*
* @throws Exception if any error occurs.
*/
@Test
public void testRebalanceStatistics() throws Exception {
createCluster(3);
ListeningTestLogger listeningTestLog = new ListeningTestLogger(log);
IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(3)).setGridLogger(listeningTestLog);
// Collect log messages with rebalance statistics.
Collection<String> logMsgs = new ConcurrentLinkedQueue<>();
listeningTestLog.registerListener(
new CallbackExecutorLogListener("Completed( \\(final\\))? rebalanc(ing|e chain).*", logMsgs::add)
);
Map<Ignite, Collection<T2<ClusterNode, Message>>> recordMsgs = new ConcurrentHashMap<>();
G.allGrids().forEach(n -> TestRecordingCommunicationSpi.spi(n).record((node, msg) -> {
if (GridDhtPartitionSupplyMessage.class.isInstance(msg))
recordMsgs.computeIfAbsent(n, n1 -> new ConcurrentLinkedQueue<>()).add(new T2<>(node, msg));
return false;
}));
IgniteEx node = startGrid(cfg);
awaitPartitionMapExchange();
// Collect supply messages only for new node.
Map<Ignite, List<GridDhtPartitionSupplyMessage>> supplyMsgs = G.allGrids().stream()
.filter(n -> !n.equals(node))
.collect(
toMap(
identity(),
n -> recordMsgs.get(n).stream()
.filter(t2 -> t2.get1().id().equals(node.localNode().id()))
.map(IgniteBiTuple::get2)
.map(GridDhtPartitionSupplyMessage.class::cast)
.collect(toList())
)
);
// +1 because one message about end of rebalance for all groups.
assertEquals(supplyMsgs.values().stream().mapToInt(List::size).sum() + 1, logMsgs.size());
IgniteClosure2X<GridCacheEntryInfo, CacheObjectContext, Long> getSize =
new IgniteClosure2X<GridCacheEntryInfo, CacheObjectContext, Long>() {
/** {@inheritDoc} */
@Override public Long applyx(
GridCacheEntryInfo info,
CacheObjectContext ctx
) throws IgniteCheckedException {
return (long)info.marshalledSize(ctx);
}
};
for (Map.Entry<Ignite, List<GridDhtPartitionSupplyMessage>> supplyMsg : supplyMsgs.entrySet()) {
List<String> supplierMsgs = logMsgs.stream()
.filter(s -> s.contains("supplier=" + supplyMsg.getKey().cluster().localNode().id()))
.collect(toList());
List<GridDhtPartitionSupplyMessage> msgs = supplyMsg.getValue();
assertEquals(msgs.size(), supplierMsgs.size());
for (GridDhtPartitionSupplyMessage msg : msgs) {
Map<Integer, CacheEntryInfoCollection> infos = U.field(msg, "infos");
CacheGroupContext grpCtx = node.context().cache().cacheGroup(msg.groupId());
long bytes = 0;
for (CacheEntryInfoCollection c : infos.values()) {
for (GridCacheEntryInfo i : c.infos())
bytes += getSize.apply(i, grpCtx.cacheObjectContext());
}
String[] checVals = {
"grp=" + grpCtx.cacheOrGroupName(),
"partitions=" + infos.size(),
"entries=" + infos.values().stream().mapToInt(i -> i.infos().size()).sum(),
"topVer=" + msg.topologyVersion(),
"rebalanceId=" + U.field(msg, "rebalanceId"),
"bytesRcvd=" + U.humanReadableByteCount(bytes),
"fullPartitions=" + infos.size(),
"fullEntries=" + infos.values().stream().mapToInt(i -> i.infos().size()).sum(),
"fullBytesRcvd=" + U.humanReadableByteCount(bytes),
"histPartitions=0",
"histEntries=0",
"histBytesRcvd=0",
};
assertTrue(
"msgs=" + supplierMsgs.toString() + ", checVals=" + asList(checVals).toString(),
supplierMsgs.stream().anyMatch(s -> Stream.of(checVals).allMatch(s::contains))
);
}
}
String rebChainMsg = logMsgs.stream().filter(s -> s.startsWith("Completed rebalance chain")).findAny().get();
long rebId = -1;
int parts = 0;
int entries = 0;
long bytes = 0;
for (List<GridDhtPartitionSupplyMessage> msgs : supplyMsgs.values()) {
for (GridDhtPartitionSupplyMessage msg : msgs) {
Map<Integer, CacheEntryInfoCollection> infos = U.field(msg, "infos");
rebId = U.field(msg, "rebalanceId");
parts += infos.size();
entries += infos.values().stream().mapToInt(i -> i.infos().size()).sum();
CacheObjectContext cacheObjCtx = node.context().cache().cacheGroup(msg.groupId()).cacheObjectContext();
for (CacheEntryInfoCollection c : infos.values()) {
for (GridCacheEntryInfo i : c.infos())
bytes += getSize.apply(i, cacheObjCtx);
}
}
}
String[] checVals = {
"partitions=" + parts,
"entries=" + entries,
"rebalanceId=" + rebId,
"bytesRcvd=" + U.humanReadableByteCount(bytes),
};
assertTrue(rebChainMsg, Stream.of(checVals).allMatch(rebChainMsg::contains));
}
/**
* Create and populate cluster.
*
* @param nodeCnt Node count.
* @return Coordinator.
* @throws Exception if any error occurs.
*/
private IgniteEx createCluster(int nodeCnt) throws Exception {
String grpName0 = "grp0";
String grpName1 = "grp1";
cacheCfgs = new CacheConfiguration[] {
cacheConfiguration("ch_0_0", grpName0, 10, 2),
cacheConfiguration("ch_0_1", grpName0, 10, 2),
cacheConfiguration("ch_0_2", grpName0, 10, 2),
cacheConfiguration("ch_1_0", grpName1, 10, 2),
cacheConfiguration("ch_1_1", grpName1, 10, 2),
};
IgniteEx crd = startGrids(nodeCnt);
crd.cluster().active(true);
populateCluster(crd, 10, "");
return crd;
}
/**
* Сontent of node data on all partitions for all caches.
*
* @param node Node.
* @param cnt Count values.
* @param add Additional value postfix.
*/
private void populateCluster(IgniteEx node, int cnt, String add) {
requireNonNull(node);
requireNonNull(add);
for (CacheConfiguration cacheCfg : cacheCfgs) {
String cacheName = cacheCfg.getName();
IgniteCache<Object, Object> cache = node.cache(cacheName);
for (int i = 0; i < cacheCfg.getAffinity().partitions(); i++) {
partitionKeys(cache, i, cnt, i * cnt)
.forEach(k -> cache.put(k, cacheName + "_val_" + k + add));
}
}
}
/**
* Create cache configuration.
*
* @param cacheName Cache name.
* @param grpName Cache group name.
* @param parts Count of partitions.
* @param backups Count backup.
* @return Cache configuration.
*/
private CacheConfiguration cacheConfiguration(String cacheName, @Nullable String grpName, int parts, int backups) {
requireNonNull(cacheName);
return new CacheConfiguration<>(cacheName)
.setCacheMode(CacheMode.PARTITIONED)
.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
.setAffinity(new RendezvousAffinityFunction(false, parts))
.setBackups(backups)
.setGroupName(grpName);
}
}