blob: a92fb1baea6b85bd1ab46993516b198317583906 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.perf;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.Properties;
public class SimpleBenchmark {
private final String kafka;
private final String zookeeper;
private final File stateDir;
private static final String SOURCE_TOPIC = "simpleBenchmarkSourceTopic";
private static final String SINK_TOPIC = "simpleBenchmarkSinkTopic";
private static final long NUM_RECORDS = 10000000L;
private static final Long END_KEY = NUM_RECORDS - 1;
private static final int KEY_SIZE = 8;
private static final int VALUE_SIZE = 100;
private static final int RECORD_SIZE = KEY_SIZE + VALUE_SIZE;
private static final Serde<byte[]> BYTE_SERDE = Serdes.ByteArray();
private static final Serde<Long> LONG_SERDE = Serdes.Long();
public SimpleBenchmark(File stateDir, String kafka, String zookeeper) {
super();
this.stateDir = stateDir;
this.kafka = kafka;
this.zookeeper = zookeeper;
}
public static void main(String[] args) throws Exception {
final File stateDir = new File("/tmp/kafka-streams-simple-benchmark");
stateDir.mkdir();
final File rocksdbDir = new File(stateDir, "rocksdb-test");
rocksdbDir.mkdir();
final String kafka = "localhost:9092";
final String zookeeper = "localhost:2181";
SimpleBenchmark benchmark = new SimpleBenchmark(stateDir, kafka, zookeeper);
// producer performance
benchmark.produce();
// consumer performance
benchmark.consume();
// simple stream performance source->process
benchmark.processStream();
// simple stream performance source->sink
benchmark.processStreamWithSink();
// simple stream performance source->store
benchmark.processStreamWithStateStore();
}
public void processStream() {
CountDownLatch latch = new CountDownLatch(1);
final KafkaStreams streams = createKafkaStreams(stateDir, kafka, zookeeper, latch);
Thread thread = new Thread() {
public void run() {
streams.start();
}
};
thread.start();
long startTime = System.currentTimeMillis();
while (latch.getCount() > 0) {
try {
latch.await();
} catch (InterruptedException ex) {
Thread.interrupted();
}
}
long endTime = System.currentTimeMillis();
System.out.println("Streams Performance [MB/sec read]: " + megaBytePerSec(endTime - startTime));
streams.close();
try {
thread.join();
} catch (Exception ex) {
// ignore
}
}
public void processStreamWithSink() {
CountDownLatch latch = new CountDownLatch(1);
final KafkaStreams streams = createKafkaStreamsWithSink(stateDir, kafka, zookeeper, latch);
Thread thread = new Thread() {
public void run() {
streams.start();
}
};
thread.start();
long startTime = System.currentTimeMillis();
while (latch.getCount() > 0) {
try {
latch.await();
} catch (InterruptedException ex) {
Thread.interrupted();
}
}
long endTime = System.currentTimeMillis();
System.out.println("Streams Performance [MB/sec read+write]: " + megaBytePerSec(endTime - startTime));
streams.close();
try {
thread.join();
} catch (Exception ex) {
// ignore
}
}
public void processStreamWithStateStore() {
CountDownLatch latch = new CountDownLatch(1);
final KafkaStreams streams = createKafkaStreamsWithStateStore(stateDir, kafka, zookeeper, latch);
Thread thread = new Thread() {
public void run() {
streams.start();
}
};
thread.start();
long startTime = System.currentTimeMillis();
while (latch.getCount() > 0) {
try {
latch.await();
} catch (InterruptedException ex) {
Thread.interrupted();
}
}
long endTime = System.currentTimeMillis();
System.out.println("Streams Performance [MB/sec read+store]: " + megaBytePerSec(endTime - startTime));
streams.close();
try {
thread.join();
} catch (Exception ex) {
// ignore
}
}
public void produce() {
Properties props = new Properties();
props.put(ProducerConfig.CLIENT_ID_CONFIG, "simple-benchmark-produce");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
KafkaProducer<Long, byte[]> producer = new KafkaProducer<>(props);
byte[] value = new byte[VALUE_SIZE];
long startTime = System.currentTimeMillis();
for (int i = 0; i < NUM_RECORDS; i++) {
producer.send(new ProducerRecord<>(SOURCE_TOPIC, (long) i, value));
}
producer.close();
long endTime = System.currentTimeMillis();
System.out.println("Producer Performance [MB/sec write]: " + megaBytePerSec(endTime - startTime));
}
public void consume() {
Properties props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple-benchmark-consumer");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
KafkaConsumer<Long, byte[]> consumer = new KafkaConsumer<>(props);
List<TopicPartition> partitions = getAllPartitions(consumer, SOURCE_TOPIC);
consumer.assign(partitions);
consumer.seekToBeginning(partitions);
Long key = null;
long startTime = System.currentTimeMillis();
while (true) {
ConsumerRecords<Long, byte[]> records = consumer.poll(500);
if (records.isEmpty()) {
if (END_KEY.equals(key))
break;
} else {
for (ConsumerRecord<Long, byte[]> record : records) {
key = record.key();
}
}
}
long endTime = System.currentTimeMillis();
consumer.close();
System.out.println("Consumer Performance [MB/sec read]: " + megaBytePerSec(endTime - startTime));
}
private KafkaStreams createKafkaStreams(File stateDir, String kafka, String zookeeper, final CountDownLatch latch) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams");
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
KStream<Long, byte[]> source = builder.stream(LONG_SERDE, BYTE_SERDE, SOURCE_TOPIC);
source.process(new ProcessorSupplier<Long, byte[]>() {
@Override
public Processor<Long, byte[]> get() {
return new Processor<Long, byte[]>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void process(Long key, byte[] value) {
if (END_KEY.equals(key)) {
latch.countDown();
}
}
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {
}
};
}
});
return new KafkaStreams(builder, props);
}
private KafkaStreams createKafkaStreamsWithSink(File stateDir, String kafka, String zookeeper, final CountDownLatch latch) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-sink");
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
KStream<Long, byte[]> source = builder.stream(LONG_SERDE, BYTE_SERDE, SOURCE_TOPIC);
source.to(LONG_SERDE, BYTE_SERDE, SINK_TOPIC);
source.process(new ProcessorSupplier<Long, byte[]>() {
@Override
public Processor<Long, byte[]> get() {
return new Processor<Long, byte[]>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void process(Long key, byte[] value) {
if (END_KEY.equals(key)) {
latch.countDown();
}
}
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {
}
};
}
});
return new KafkaStreams(builder, props);
}
private KafkaStreams createKafkaStreamsWithStateStore(File stateDir, String kafka, String zookeeper, final CountDownLatch latch) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-store");
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KStreamBuilder builder = new KStreamBuilder();
builder.addStateStore(Stores.create("store").withLongKeys().withByteArrayValues().persistent().build());
KStream<Long, byte[]> source = builder.stream(LONG_SERDE, BYTE_SERDE, SOURCE_TOPIC);
source.process(new ProcessorSupplier<Long, byte[]>() {
@Override
public Processor<Long, byte[]> get() {
return new Processor<Long, byte[]>() {
KeyValueStore<Long, byte[]> store;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
store = (KeyValueStore<Long, byte[]>) context.getStateStore("store");
}
@Override
public void process(Long key, byte[] value) {
store.put(key, value);
if (END_KEY.equals(key)) {
latch.countDown();
}
}
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {
}
};
}
}, "store");
return new KafkaStreams(builder, props);
}
private double megaBytePerSec(long time) {
return (double) (RECORD_SIZE * NUM_RECORDS / 1024 / 1024) / ((double) time / 1000);
}
private List<TopicPartition> getAllPartitions(KafkaConsumer<?, ?> consumer, String... topics) {
ArrayList<TopicPartition> partitions = new ArrayList<>();
for (String topic : topics) {
for (PartitionInfo info : consumer.partitionsFor(topic)) {
partitions.add(new TopicPartition(info.topic(), info.partition()));
}
}
return partitions;
}
}