blob: 0ab8246fccc01489a8bc0a4236259c6dadbfb223 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.loadtests.communication;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.LongAdder;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.loadtests.util.GridCumulativeAverage;
import org.jetbrains.annotations.Nullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
import static org.apache.ignite.testframework.GridLoadTestUtils.appendLineToFile;
import static org.apache.ignite.testframework.GridLoadTestUtils.startDaemon;
/**
* By default this benchmarks uses original Ignite configuration
* with message dispatching from NIO threads.
*
* By changing {@link #DFLT_CONFIG} constant you can use ForkJoin thread pool instead of JDK default.
*
* Note that you should run 2 processes of this test to get it running.
*/
public class GridIoManagerBenchmark {
/** */
public static final String DFLT_CONFIG = "modules/tests/config/io-manager-benchmark.xml";
/** */
private static final int DFLT_THREADS = 2;
/** */
private static final long WARM_UP_DUR = 30 * 1000;
/** */
private static final Semaphore sem = new Semaphore(10 * 1024);
/** */
public static final int TEST_TOPIC = 1;
/** */
private static final LongAdder msgCntr = new LongAdder();
/** */
private static final Map<IgniteUuid, CountDownLatch> latches = new ConcurrentHashMap<>();
/** */
private static final byte[][] arrs;
/** */
private static boolean testHeavyMsgs;
/** */
private static boolean testLatency;
/**
*
*/
static {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
arrs = new byte[64][];
for (int i = 0; i < arrs.length; i++) {
byte[] arr = new byte[rnd.nextInt(4096, 8192)];
for (int j = 0; j < arr.length; j++)
arr[j] = (byte)rnd.nextInt(0, 127);
arrs[i] = arr;
}
}
/**
* @param args Command line arguments.
*/
public static void main(String[] args) {
int threads = args.length > 0 ? Integer.parseInt(args[0]) : DFLT_THREADS;
int duration = args.length > 1 ? Integer.parseInt(args[1]) : 0;
String outputFilename = args.length > 2 ? args[2] : null;
String path = args.length > 3 ? args[3] : DFLT_CONFIG;
testHeavyMsgs = args.length > 4 && "true".equalsIgnoreCase(args[4]);
testLatency = args.length > 5 && "true".equalsIgnoreCase(args[5]);
// threads = 128;
// testLatency = true;
// testHeavyMsgs = true;
X.println("Config: " + path);
X.println("Test heavy messages: " + testHeavyMsgs);
X.println("Test latency: " + testLatency);
X.println("Threads: " + threads);
X.println("Duration: " + duration);
X.println("Output file name: " + outputFilename);
IgniteKernal g = (IgniteKernal)G.start(path);
if (g.localNode().order() > 1) {
try {
sendMessages(g, threads, duration, outputFilename);
}
finally {
G.stopAll(false);
}
}
else
receiveMessages(g);
}
/**
* @param g Kernal.
* @param threads Number of send threads.
* @param duration Test duration.
* @param outputFilename Output file name.
*/
private static void sendMessages(IgniteKernal g, int threads, int duration, @Nullable final String outputFilename) {
X.println(">>> Sending messages.");
g.context().io().addMessageListener(TEST_TOPIC, new SenderMessageListener());
Thread collector = startDaemon(new Runnable() {
@Override public void run() {
final long initTs = System.currentTimeMillis();
long ts = initTs;
long queries = msgCntr.sum();
GridCumulativeAverage qpsAvg = new GridCumulativeAverage();
try {
while (!Thread.currentThread().isInterrupted()) {
U.sleep(10000);
long newTs = System.currentTimeMillis();
long newQueries = msgCntr.sum();
long executed = newQueries - queries;
long time = newTs - ts;
long qps = executed * 1000 / time;
boolean recordAvg = ts - initTs > WARM_UP_DUR;
if (recordAvg) qpsAvg.update(qps);
X.println("Communication benchmark [qps=" + qps + (recordAvg ? ", qpsAvg=" + qpsAvg : "") +
", executed=" + executed + ", time=" + time + ']');
ts = newTs;
queries = newQueries;
}
}
catch (IgniteInterruptedCheckedException ignored) {
// No-op.
}
X.println("Average QPS: " + qpsAvg);
if (outputFilename != null) {
try {
X.println("Saving results to output file: " + outputFilename);
appendLineToFile(outputFilename, "%s,%d", IgniteUtils.LONG_DATE_FMT.format(
Instant.now()), qpsAvg.get());
}
catch (IOException e) {
X.println("Failed to record results to a file: " + e.getMessage());
}
}
}
});
Collection<SendThread> sndThreads = new ArrayList<>(threads);
for (int i = 0; i < threads; i++) {
SendThread t = new SendThread(g);
sndThreads.add(t);
t.start();
}
try {
U.sleep(duration > 0 ? duration * 1000 + WARM_UP_DUR : Long.MAX_VALUE);
}
catch (IgniteInterruptedCheckedException ignored) {
// No-op.
}
collector.interrupt();
for (SendThread t : sndThreads)
t.interrupt();
}
/**
* @param g Kernal.
*/
private static void receiveMessages(final IgniteKernal g) {
X.println(">>> Receiving messages.");
final GridIoManager io = g.context().io();
GridMessageListener lsnr = new GridMessageListener() {
private ClusterNode node;
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
if (node == null)
node = g.context().discovery().node(nodeId);
GridTestMessage testMsg = ((GridTestMessage)msg);
testMsg.bytes(null);
try {
io.sendToCustomTopic(node, TEST_TOPIC, testMsg, PUBLIC_POOL);
}
catch (IgniteCheckedException e) {
e.printStackTrace();
}
}
};
io.addMessageListener(TEST_TOPIC, lsnr);
}
/**
*
*/
private static class SendThread extends Thread {
/** */
private final IgniteKernal g;
/**
* @param g Kernal.
*/
SendThread(IgniteKernal g) {
this.g = g;
}
/** {@inheritDoc} */
@Override public void run() {
try {
ClusterNode dst = awaitOther(g.context().discovery());
GridIoManager io = g.context().io();
Random rnd = ThreadLocalRandom.current();
IgniteUuid msgId = IgniteUuid.randomUuid();
while (!Thread.interrupted()) {
CountDownLatch latch = null;
if (testLatency)
latches.put(msgId, latch = new CountDownLatch(1));
else
sem.acquire();
io.sendToCustomTopic(
dst,
TEST_TOPIC,
new GridTestMessage(msgId, testHeavyMsgs ? arrs[rnd.nextInt(arrs.length)] : null),
PUBLIC_POOL);
if (testLatency && !latch.await(1000, MILLISECONDS))
throw new RuntimeException("Failed to await latch.");
}
}
catch (IgniteCheckedException e) {
e.printStackTrace();
}
catch (InterruptedException ignored) {
// No-op.
}
}
/**
* @param disc Discovery.
* @return Second node in the topology.
* @throws InterruptedException If interrupted.
*/
@SuppressWarnings("BusyWait")
private ClusterNode awaitOther(final GridDiscoveryManager disc) throws InterruptedException {
while (disc.allNodes().size() < 2)
Thread.sleep(1000);
for (ClusterNode node : disc.allNodes())
if (!F.eqNodes(node, disc.localNode()))
return node;
assert false;
return null;
}
}
/**
*
*/
private static class SenderMessageListener implements GridMessageListener {
/** {@inheritDoc} */
@Override public void onMessage(UUID nodeId, Object msg, byte plc) {
msgCntr.increment();
if (testLatency)
latches.get(((GridTestMessage)msg).id()).countDown();
else
sem.release();
}
}
}