blob: 24bf52e3ab2ff409657df098b93e6a5ea9453143 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.managers.communication;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteCallable;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
/**
*
*/
public class IgniteCommunicationBalanceTest extends GridCommonAbstractTest {
/** */
private int selectors;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
TcpCommunicationSpi commSpi = ((TcpCommunicationSpi)cfg.getCommunicationSpi());
commSpi.setSharedMemoryPort(-1);
commSpi.setConnectionsPerNode(connectionsPerNode());
commSpi.setUsePairedConnections(usePairedConnections());
if (selectors > 0)
commSpi.setSelectorsCount(selectors);
if (sslEnabled())
cfg.setSslContextFactory(GridTestUtils.sslFactory());
return cfg;
}
/**
* @return {@code True} to enable SSL.
*/
protected boolean sslEnabled() {
return false;
}
/**
* @return Value for {@link TcpCommunicationSpi#setUsePairedConnections(boolean)}.
*/
protected boolean usePairedConnections() {
return false;
}
/**
* @return Connections per node.
*/
protected int connectionsPerNode() {
return 1;
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
stopAllGrids();
super.afterTest();
}
/**
* @throws Exception If failed.
*/
@Test
public void testBalance1() throws Exception {
if (sslEnabled())
return;
System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "5000");
try {
selectors = 4;
final int SRVS = 6;
startGridsMultiThreaded(SRVS);
final Ignite client = startClientGrid(SRVS);
for (int i = 0; i < SRVS; i++) {
ClusterNode node = client.cluster().node(ignite(i).cluster().localNode().id());
client.compute(client.cluster().forNode(node)).call(new DummyCallable(null));
}
waitNioBalanceStop(Collections.singletonList(client), 10_000);
final GridNioServer srv = GridTestUtils.getFieldValue(client.configuration().getCommunicationSpi(), "nioSrvr");
ThreadLocalRandom rnd = ThreadLocalRandom.current();
long readMoveCnt1 = srv.readerMoveCount();
long writeMoveCnt1 = srv.writerMoveCount();
int prevNodeIdx = -1;
for (int iter = 0; iter < 10; iter++) {
int nodeIdx = rnd.nextInt(SRVS);
while (prevNodeIdx == nodeIdx)
nodeIdx = rnd.nextInt(SRVS);
prevNodeIdx = nodeIdx;
log.info("Iteration [iter=" + iter + ", node=" + nodeIdx + ']');
final long readMoveCnt = readMoveCnt1;
final long writeMoveCnt = writeMoveCnt1;
final int nodeIdx0 = nodeIdx;
GridTestUtils.waitForCondition(new GridAbsPredicate() {
@Override public boolean apply() {
byte[] data = new byte[100_000];
for (int j = 0; j < 10; j++) {
for (int i = 0; i < SRVS; i++) {
ClusterNode node = client.cluster().node(ignite(i).cluster().localNode().id());
IgniteCompute compute = client.compute(client.cluster().forNode(node));
compute.call(new DummyCallable(i == nodeIdx0 ? data : null));
}
}
if (usePairedConnections())
return srv.readerMoveCount() > readMoveCnt && srv.writerMoveCount() > writeMoveCnt;
else
return srv.readerMoveCount() > readMoveCnt || srv.writerMoveCount() > writeMoveCnt;
}
}, 30_000);
waitNioBalanceStop(Collections.singletonList(client), 30_000);
long readMoveCnt2 = srv.readerMoveCount();
long writeMoveCnt2 = srv.writerMoveCount();
log.info("Move counts [rc1=" + readMoveCnt1 +
", wc1=" + writeMoveCnt1 +
", rc2=" + readMoveCnt2 +
", wc2=" + writeMoveCnt2 + ']');
if (usePairedConnections()) {
assertTrue(readMoveCnt2 > readMoveCnt1);
assertTrue(writeMoveCnt2 > writeMoveCnt1);
}
else
assertTrue(readMoveCnt2 > readMoveCnt1 || writeMoveCnt2 > writeMoveCnt1);
readMoveCnt1 = readMoveCnt2;
writeMoveCnt1 = writeMoveCnt2;
}
waitNioBalanceStop(G.allGrids(), 10_000);
}
finally {
System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "");
}
}
/**
* @throws Exception If failed.
*/
@Test
public void testBalance2() throws Exception {
System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "1000");
try {
startGridsMultiThreaded(5);
startClientGridsMultiThreaded(5, 5);
for (int i = 0; i < 5; i++) {
log.info("Iteration: " + i);
final AtomicInteger idx = new AtomicInteger();
GridTestUtils.runMultiThreaded(new Callable<Void>() {
@Override public Void call() throws Exception {
Ignite node = ignite(idx.incrementAndGet() % 10);
ThreadLocalRandom rnd = ThreadLocalRandom.current();
int msgs = rnd.nextInt(500, 600);
for (int i = 0; i < msgs; i++) {
int sndTo = rnd.nextInt(10);
ClusterNode sntToNode = node.cluster().node(ignite(sndTo).cluster().localNode().id());
IgniteCompute compute = node.compute(node.cluster().forNode(sntToNode));
compute.call(new DummyCallable(new byte[rnd.nextInt(rnd.nextInt(256, 1024))]));
}
return null;
}
}, 30, "test-thread");
waitNioBalanceStop(G.allGrids(), 10_000);
}
}
finally {
System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "");
}
}
/**
* @param nodes Node.
* @param timeout Timeout.
* @throws Exception If failed.
*/
private void waitNioBalanceStop(List<Ignite> nodes, long timeout) throws Exception {
final List<GridNioServer> srvs = new ArrayList<>();
for (Ignite node : nodes) {
TcpCommunicationSpi spi = (TcpCommunicationSpi) node.configuration().getCommunicationSpi();
GridNioServer srv = GridTestUtils.getFieldValue(spi, "nioSrvr");
srvs.add(srv);
}
assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicateX() {
@Override public boolean applyx() throws IgniteCheckedException {
List<Long> rCnts = new ArrayList<>();
List<Long> wCnts = new ArrayList<>();
for (GridNioServer srv : srvs) {
long readerMovCnt1 = srv.readerMoveCount();
long writerMovCnt1 = srv.writerMoveCount();
rCnts.add(readerMovCnt1);
wCnts.add(writerMovCnt1);
}
U.sleep(2000);
for (int i = 0; i < srvs.size(); i++) {
GridNioServer srv = srvs.get(i);
long readerMovCnt1 = rCnts.get(i);
long writerMovCnt1 = wCnts.get(i);
long readerMovCnt2 = srv.readerMoveCount();
long writerMovCnt2 = srv.writerMoveCount();
if (readerMovCnt1 != readerMovCnt2) {
log.info("Readers balance is in progress [node=" + i + ", cnt1=" + readerMovCnt1 +
", cnt2=" + readerMovCnt2 + ']');
return false;
}
if (writerMovCnt1 != writerMovCnt2) {
log.info("Writers balance is in progress [node=" + i + ", cnt1=" + writerMovCnt1 +
", cnt2=" + writerMovCnt2 + ']');
return false;
}
}
return true;
}
}, timeout));
}
/**
* @throws Exception If failed.
*/
@Test
public void testRandomBalance() throws Exception {
System.setProperty(GridNioServer.IGNITE_IO_BALANCE_RANDOM_BALANCE, "true");
System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "500");
try {
final int NODES = 10;
startGridsMultiThreaded(NODES);
final long stopTime = System.currentTimeMillis() + 60_000;
GridTestUtils.runMultiThreaded(new Callable<Object>() {
@Override public Object call() throws Exception {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
while (System.currentTimeMillis() < stopTime)
ignite(rnd.nextInt(NODES)).compute().broadcast(new DummyCallable(null));
return null;
}
}, 20, "test-thread");
}
finally {
System.setProperty(GridNioServer.IGNITE_IO_BALANCE_RANDOM_BALANCE, "");
System.setProperty(IgniteSystemProperties.IGNITE_IO_BALANCE_PERIOD, "");
}
}
/**
*
*/
private static class DummyCallable implements IgniteCallable<Object> {
/** */
private byte[] data;
/**
* @param data Data.
*/
DummyCallable(byte[] data) {
this.data = data;
}
/** {@inheritDoc} */
@Override public Object call() throws Exception {
return data;
}
}
}