blob: a13d3ec68fc688eab9f1ebf029e6c45a63edec6e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.kafka.tools;
import static net.sourceforge.argparse4j.impl.Arguments.store;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import org.apache.kafka.common.utils.Utils;
public class ProducerPerformance {
public static void main(String[] args) throws Exception {
ArgumentParser parser = argParser();
try {
Namespace res = parser.parseArgs(args);
/* parse args */
String topicName = res.getString("topic");
long numRecords = res.getLong("numRecords");
int recordSize = res.getInt("recordSize");
int throughput = res.getInt("throughput");
List<String> producerProps = res.getList("producerConfig");
String producerConfig = res.getString("producerConfigFile");
if (producerProps == null && producerConfig == null) {
throw new ArgumentParserException("Either --producer-props or --producer.config must be specified.", parser);
}
Properties props = new Properties();
if (producerConfig != null) {
props.putAll(Utils.loadProps(producerConfig));
}
if (producerProps != null)
for (String prop : producerProps) {
String[] pieces = prop.split("=");
if (pieces.length != 2)
throw new IllegalArgumentException("Invalid property: " + prop);
props.put(pieces[0], pieces[1]);
}
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(props);
/* setup perf test */
byte[] payload = new byte[recordSize];
Random random = new Random(0);
for (int i = 0; i < payload.length; ++i)
payload[i] = (byte) (random.nextInt(26) + 65);
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topicName, payload);
Stats stats = new Stats(numRecords, 5000);
long startMs = System.currentTimeMillis();
ThroughputThrottler throttler = new ThroughputThrottler(throughput, startMs);
for (int i = 0; i < numRecords; i++) {
long sendStartMs = System.currentTimeMillis();
Callback cb = stats.nextCompletion(sendStartMs, payload.length, stats);
producer.send(record, cb);
if (throttler.shouldThrottle(i, sendStartMs)) {
throttler.throttle();
}
}
/* print final results */
producer.close();
stats.printTotal();
} catch (ArgumentParserException e) {
if (args.length == 0) {
parser.printHelp();
System.exit(0);
} else {
parser.handleError(e);
System.exit(1);
}
}
}
/** Get the command-line argument parser. */
private static ArgumentParser argParser() {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("producer-performance")
.defaultHelp(true)
.description("This tool is used to verify the producer performance.");
parser.addArgument("--topic")
.action(store())
.required(true)
.type(String.class)
.metavar("TOPIC")
.help("produce messages to this topic");
parser.addArgument("--num-records")
.action(store())
.required(true)
.type(Long.class)
.metavar("NUM-RECORDS")
.dest("numRecords")
.help("number of messages to produce");
parser.addArgument("--record-size")
.action(store())
.required(true)
.type(Integer.class)
.metavar("RECORD-SIZE")
.dest("recordSize")
.help("message size in bytes");
parser.addArgument("--throughput")
.action(store())
.required(true)
.type(Integer.class)
.metavar("THROUGHPUT")
.help("throttle maximum message throughput to *approximately* THROUGHPUT messages/sec");
parser.addArgument("--producer-props")
.nargs("+")
.required(false)
.metavar("PROP-NAME=PROP-VALUE")
.type(String.class)
.dest("producerConfig")
.help("kafka producer related configuration properties like bootstrap.servers,client.id etc. " +
"These configs take precedence over those passed via --producer.config.");
parser.addArgument("--producer.config")
.action(store())
.required(false)
.type(String.class)
.metavar("CONFIG-FILE")
.dest("producerConfigFile")
.help("producer config properties file.");
return parser;
}
private static class Stats {
private long start;
private long windowStart;
private int[] latencies;
private int sampling;
private int iteration;
private int index;
private long count;
private long bytes;
private int maxLatency;
private long totalLatency;
private long windowCount;
private int windowMaxLatency;
private long windowTotalLatency;
private long windowBytes;
private long reportingInterval;
public Stats(long numRecords, int reportingInterval) {
this.start = System.currentTimeMillis();
this.windowStart = System.currentTimeMillis();
this.index = 0;
this.iteration = 0;
this.sampling = (int) (numRecords / Math.min(numRecords, 500000));
this.latencies = new int[(int) (numRecords / this.sampling) + 1];
this.index = 0;
this.maxLatency = 0;
this.totalLatency = 0;
this.windowCount = 0;
this.windowMaxLatency = 0;
this.windowTotalLatency = 0;
this.windowBytes = 0;
this.totalLatency = 0;
this.reportingInterval = reportingInterval;
}
public void record(int iter, int latency, int bytes, long time) {
this.count++;
this.bytes += bytes;
this.totalLatency += latency;
this.maxLatency = Math.max(this.maxLatency, latency);
this.windowCount++;
this.windowBytes += bytes;
this.windowTotalLatency += latency;
this.windowMaxLatency = Math.max(windowMaxLatency, latency);
if (iter % this.sampling == 0) {
this.latencies[index] = latency;
this.index++;
}
/* maybe report the recent perf */
if (time - windowStart >= reportingInterval) {
printWindow();
newWindow();
}
}
public Callback nextCompletion(long start, int bytes, Stats stats) {
Callback cb = new PerfCallback(this.iteration, start, bytes, stats);
this.iteration++;
return cb;
}
public void printWindow() {
long ellapsed = System.currentTimeMillis() - windowStart;
double recsPerSec = 1000.0 * windowCount / (double) ellapsed;
double mbPerSec = 1000.0 * this.windowBytes / (double) ellapsed / (1024.0 * 1024.0);
System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.1f ms avg latency, %.1f max latency.\n",
windowCount,
recsPerSec,
mbPerSec,
windowTotalLatency / (double) windowCount,
(double) windowMaxLatency);
}
public void newWindow() {
this.windowStart = System.currentTimeMillis();
this.windowCount = 0;
this.windowMaxLatency = 0;
this.windowTotalLatency = 0;
this.windowBytes = 0;
}
public void printTotal() {
long elapsed = System.currentTimeMillis() - start;
double recsPerSec = 1000.0 * count / (double) elapsed;
double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0);
int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999);
System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.\n",
count,
recsPerSec,
mbPerSec,
totalLatency / (double) count,
(double) maxLatency,
percs[0],
percs[1],
percs[2],
percs[3]);
}
private static int[] percentiles(int[] latencies, int count, double... percentiles) {
int size = Math.min(count, latencies.length);
Arrays.sort(latencies, 0, size);
int[] values = new int[percentiles.length];
for (int i = 0; i < percentiles.length; i++) {
int index = (int) (percentiles[i] * size);
values[i] = latencies[index];
}
return values;
}
}
private static final class PerfCallback implements Callback {
private final long start;
private final int iteration;
private final int bytes;
private final Stats stats;
public PerfCallback(int iter, long start, int bytes, Stats stats) {
this.start = start;
this.stats = stats;
this.iteration = iter;
this.bytes = bytes;
}
public void onCompletion(RecordMetadata metadata, Exception exception) {
long now = System.currentTimeMillis();
int latency = (int) (now - start);
this.stats.record(iteration, latency, bytes, now);
if (exception != null)
exception.printStackTrace();
}
}
}