blob: ead6c66861a0beadf400c6a672081b2922b56ba6 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package com.datatorrent.benchmark.kafka;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.contrib.kafka.HighlevelKafkaConsumer;
import com.datatorrent.contrib.kafka.KafkaConsumer;
import com.datatorrent.contrib.kafka.SimpleKafkaConsumer;
* The stream app to test the benckmark of kafka
* You can set the property file to make it using either {@link SimpleKafkaConsumer} or {@link HighlevelKafkaConsumer}
* The performance are pretty close
* @since 0.9.3
@ApplicationAnnotation(name = "KafkaInputBenchmark")
public class KafkaInputBenchmark implements StreamingApplication
public static class CollectorModule extends BaseOperator
public final transient DefaultInputPort<String> inputPort = new DefaultInputPort<String>()
public void process(String arg0)
public void populateDAG(DAG dag, Configuration conf)
dag.setAttribute(DAG.APPLICATION_NAME, "KafkaInputOperatorPartitionDemo");
BenchmarkKafkaInputOperator bpkio = new BenchmarkKafkaInputOperator();
String type = conf.get("kafka.consumertype", "simple");
KafkaConsumer consumer = null;
if (type.equals("highlevel")) {
// Create template high-level consumer
Properties props = new Properties();
props.put("", "main_group");
props.put("auto.offset.reset", "smallest");
consumer = new HighlevelKafkaConsumer(props);
} else {
// topic is set via property file
consumer = new SimpleKafkaConsumer(null, 10000, 100000, "test_kafka_autop_client", null);
//bpkio.setTuplesBlast(1024 * 1024);
bpkio = dag.addOperator("KafkaBenchmarkConsumer", bpkio);
CollectorModule cm = dag.addOperator("DataBlackhole", CollectorModule.class);
dag.addStream("end", bpkio.oport, cm.inputPort).setLocality(Locality.CONTAINER_LOCAL);
dag.setInputPortAttribute(cm.inputPort, PortContext.PARTITION_PARALLEL, true);
dag.setAttribute(bpkio, OperatorContext.COUNTERS_AGGREGATOR, new KafkaConsumer.KafkaMeterStatsAggregator());
// dag.setAttribute(bpkio, OperatorContext.STATS_LISTENER, KafkaMeterStatsListener.class);