Merge branch 'dt-dev' into kafkainput
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
similarity index 79%
rename from benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaInputOperator.java
rename to benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
index 0426b54..585966a 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaInputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
@@ -18,7 +18,7 @@
import kafka.message.Message;
import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.contrib.kafka.AbstractPartitionableKafkaInputOperator;
+import com.datatorrent.contrib.kafka.AbstractKafkaInputOperator;
/**
* This operator emits one constant message for each kafka message received.
@@ -30,7 +30,7 @@
*
* @since 0.9.3
*/
-public class BenchmarkPartitionableKafkaInputOperator extends AbstractPartitionableKafkaInputOperator
+public class BenchmarkKafkaInputOperator extends AbstractKafkaInputOperator
{
/**
* The output port on which messages are emitted.
@@ -38,12 +38,6 @@
public transient DefaultOutputPort<String> oport = new DefaultOutputPort<String>();
@Override
- protected AbstractPartitionableKafkaInputOperator cloneOperator()
- {
- return new BenchmarkPartitionableKafkaInputOperator();
- }
-
- @Override
protected void emitTuple(Message message)
{
oport.emit("Received");
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
index 3f82afd..3640e15 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
@@ -29,6 +29,9 @@
import javax.validation.constraints.Min;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.InputOperator;
@@ -48,6 +51,8 @@
public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<BenchmarkPartitionableKafkaOutputOperator>, InputOperator, ActivationListener<OperatorContext>
{
+ private static final Logger logger = LoggerFactory.getLogger(BenchmarkPartitionableKafkaOutputOperator.class);
+
private String topic = "benchmark";
@Min(1)
@@ -77,6 +82,7 @@
@Override
public void run()
{
+ logger.info("Start produce data .... ");
Properties props = new Properties();
props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
props.setProperty("key.serializer.class", "kafka.serializer.StringEncoder");
@@ -85,7 +91,7 @@
props.setProperty("partitioner.class", KafkaTestPartitioner.class.getCanonicalName());
props.setProperty("producer.type", "async");
// props.setProperty("send.buffer.bytes", "1048576");
- props.setProperty("topic.metadata.refresh.interval.ms", "100000");
+ props.setProperty("topic.metadata.refresh.interval.ms", "10000");
if (producer == null) {
producer = new Producer<String, String>(new ProducerConfig(props));
@@ -164,6 +170,7 @@
public void activate(OperatorContext arg0)
{
+ logger.info("Activate the benchmark kafka output operator .... ");
constantMsg = new byte[msgSize];
for (int i = 0; i < constantMsg.length; i++) {
constantMsg[i] = (byte) ('a' + i%26);
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
index 18ef3f4..a6deefc 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
@@ -16,7 +16,7 @@
package com.datatorrent.benchmark.kafka;
-import java.util.HashSet;
+import com.google.common.collect.Sets;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
@@ -32,7 +32,7 @@
import com.datatorrent.contrib.kafka.HighlevelKafkaConsumer;
import com.datatorrent.contrib.kafka.KafkaConsumer;
import com.datatorrent.contrib.kafka.SimpleKafkaConsumer;
-import com.google.common.collect.Sets;
+
/**
* The stream app to test the benckmark of kafka
@@ -54,16 +54,14 @@
{
}
};
-
}
-
@Override
public void populateDAG(DAG dag, Configuration conf)
{
dag.setAttribute(DAG.APPLICATION_NAME, "KafkaInputOperatorPartitionDemo");
- BenchmarkPartitionableKafkaInputOperator bpkio = new BenchmarkPartitionableKafkaInputOperator();
+ BenchmarkKafkaInputOperator bpkio = new BenchmarkKafkaInputOperator();
String type = conf.get("kafka.consumertype", "simple");
@@ -75,19 +73,19 @@
// Create template high-level consumer
Properties props = new Properties();
- props.put("zookeeper.connect", conf.get("kafka.zookeeper"));
props.put("group.id", "main_group");
props.put("auto.offset.reset", "smallest");
consumer = new HighlevelKafkaConsumer(props);
} else {
// topic is set via property file
- consumer = new SimpleKafkaConsumer(null, 10000, 100000, "test_kafka_autop_client", new HashSet<Integer>());
+ consumer = new SimpleKafkaConsumer(null, 10000, 100000, "test_kafka_autop_client", null);
}
- consumer.setBrokerSet(Sets.newHashSet(conf.get("dt.kafka.brokerlist").split("\\s*,\\s*")));
+ bpkio.setZookeeper(conf.get("dt.kafka.zookeeper"));
bpkio.setInitialPartitionCount(1);
//bpkio.setTuplesBlast(1024 * 1024);
bpkio.setConsumer(consumer);
+
bpkio = dag.addOperator("KafkaBenchmarkConsumer", bpkio);
CollectorModule cm = dag.addOperator("DataBlackhole", CollectorModule.class);
diff --git a/benchmark/src/site/conf/dt-site-kafka.xml b/benchmark/src/site/conf/dt-site-kafka.xml
index 4db589f..38ef213 100644
--- a/benchmark/src/site/conf/dt-site-kafka.xml
+++ b/benchmark/src/site/conf/dt-site-kafka.xml
@@ -16,5 +16,10 @@
<name>dt.kafka.brokerlist</name>
<value>localhost:9092</value>
</property>
+
+ <property>
+ <name>dt.kafka.zookeeper</name>
+ <value>localhost:2181</value>
+ </property>
</configuration>
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index 1dd682b..7e4211a 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -15,27 +15,97 @@
*/
package com.datatorrent.contrib.kafka;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.Operator.CheckpointListener;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.Stats;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.Stateless;
+
+import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.getOffsetsForPartitions;
+import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.get_1minMovingAvgParMap;
+
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
import javax.validation.Valid;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.cluster.Broker;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.Message;
+import kafka.message.MessageAndOffset;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.MutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.Operator.ActivationListener;
-import com.datatorrent.api.Operator.CheckpointListener;
-
/**
* This is a base implementation of a Kafka input operator, which consumes data from Kafka message bus.
* Subclasses should implement the method for emitting tuples to downstream operators.
+ * It will be dynamically partitioned based on the upstream kafka partition.
* <p>
+ * <b>Partition Strategy:</b>
+ * <p><b>1. ONE_TO_ONE partition</b> Each operator partition will consume from only one kafka partition </p>
+ * <p><b>2. ONE_TO_MANY partition</b> Each operator partition consumer from multiple kafka partition with some hard ingestion rate limit</p>
+ * <p><b>3. ONE_TO_MANY_HEURISTIC partition</b>(Not implemented yet) Each operator partition consumer from multiple kafka partition and partition number depends on heuristic function(real time bottle neck)</p>
+ * <p><b>Note:</b> ONE_TO_MANY partition only support simple kafka consumer because
+ * <p> 1) high-level consumer can only balance the number of brokers it consumes from rather than the actual load from each broker</p>
+ * <p> 2) high-level consumer can not reset offset once it's committed so the tuples are not replayable </p>
+ * <p></p>
+ * <br>
+ * <br>
+ * <b>Basic Algorithm:</b>
+ * <p>1.Pull the metadata(how many partitions) of the topic from brokerList of {@link KafkaConsumer}</p>
+ * <p>2.cloneConsumer method is used to initialize the new {@link KafkaConsumer} instance for the new partition operator</p>
+ * <p>3.cloneOperator method is used to initialize the new {@link AbstractKafkaInputOperator} instance for the new partition operator</p>
+ * <p>4.ONE_TO_MANY partition use first-fit decreasing algorithm(http://en.wikipedia.org/wiki/Bin_packing_problem) to minimize the partition operator
+ * <br>
+ * <br>
+ * <b>Load balance:</b> refer to {@link SimpleKafkaConsumer} and {@link HighlevelKafkaConsumer} <br>
+ * <b>Kafka partition failover:</b> refer to {@link SimpleKafkaConsumer} and {@link HighlevelKafkaConsumer}
+ * <br>
+ * <br>
+ * <b>Self adjust to Kafka partition change:</b>
+ * <p><b>EACH</b> operator partition periodically check the leader broker(s) change which it consumes from and adjust connection without repartition</p>
+ * <p><b>ONLY APPMASTER</b> operator periodically check overall kafka partition layout and add operator partition due to kafka partition add(no delete supported by kafka for now)</p>
+ * <br>
+ * <br>
+ * </p>
* Properties:<br>
* <b>tuplesBlast</b>: Number of tuples emitted in each burst<br>
* <b>bufferSize</b>: Size of holding buffer<br>
@@ -50,15 +120,7 @@
* TBD<br>
* <br>
*
- * Shipped jars with this operator:<br>
- * <b>kafka.javaapi.consumer.SimpleConsumer.class</b> Official kafka consumer client <br>
- * <b>org.I0Itec.zkclient.ZkClient.class</b> Kafka client depends on this <br>
- * <b>scala.ScalaObject.class</b> Kafka client depends on this <br>
- * <b>com.yammer.matrics.Metrics.class</b> Kafka client depends on this <br> <br>
- *
- * Each operator can only consume 1 topic<br>
- * If you want partitionable operator refer to {@link AbstractPartitionableKafkaInputOperator}
- * <br>
+ * Each operator can consume 1 topic from multiple partitions and clusters<br>
* </p>
*
* @displayName Abstract Kafka Input
@@ -67,23 +129,69 @@
*
* @since 0.3.2
*/
-//SimpleConsumer is kafka consumer client used by this operator, zkclient is used by high-level kafka consumer
-public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implements InputOperator, ActivationListener<OperatorContext>, CheckpointListener
+
+@OperatorAnnotation(partitionable = true)
+public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implements InputOperator, ActivationListener<OperatorContext>, CheckpointListener, Partitioner<AbstractKafkaInputOperator<K>>, StatsListener
{
private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaInputOperator.class);
@Min(1)
private int maxTuplesPerWindow = Integer.MAX_VALUE;
private transient int emitCount = 0;
+ protected IdempotentStorageManager idempotentStorageManager;
+ protected transient long currentWindowId;
+ protected transient int operatorId;
+ protected final transient Map<KafkaPartition, MutablePair<Long, Integer>> currentWindowRecoveryState;
+ protected transient Map<KafkaPartition, Long> offsetStats = new HashMap<KafkaPartition, Long>();
+ private transient OperatorContext context = null;
+ private boolean idempotent = true;
+ // By default the partition policy is 1:1
+ public PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE;
+
+ // default resource is unlimited in terms of msgs per second
+ private long msgRateUpperBound = Long.MAX_VALUE;
+
+ // default resource is unlimited in terms of bytes per second
+ private long byteRateUpperBound = Long.MAX_VALUE;
+
+ // Store the current operator partition topology
+ private transient List<PartitionInfo> currentPartitionInfo = Lists.newLinkedList();
+
+ // Store the current collected kafka consumer stats
+ private transient Map<Integer, List<KafkaConsumer.KafkaMeterStats>> kafkaStatsHolder = new HashMap<Integer, List<KafkaConsumer.KafkaMeterStats>>();
+
+ private OffsetManager offsetManager = null;
+
+ // Minimal interval between 2 (re)partition actions
+ private long repartitionInterval = 30000L;
+
+ // Minimal interval between checking collected stats and decide whether it needs to repartition or not.
+ // And minimal interval between 2 offset updates
+ private long repartitionCheckInterval = 5000L;
+
+ private transient long lastCheckTime = 0L;
+
+ private transient long lastRepartitionTime = 0L;
+
+ // A list store the newly discovered partitions
+ private transient List<KafkaPartition> newWaitingPartition = new LinkedList<KafkaPartition>();
+
+ @Min(1)
+ private int initialPartitionCount = 1;
+
@NotNull
@Valid
protected KafkaConsumer consumer = new SimpleKafkaConsumer();
+ public AbstractKafkaInputOperator()
+ {
+ idempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager();
+ currentWindowRecoveryState = new HashMap<KafkaPartition, MutablePair<Long, Integer>>();
+ }
+
/**
- * Any concrete class derived from KafkaInputOperator has to implement this method
- * so that it knows what type of message it is going to send to Malhar in which output port.
+ * Any concrete class derived from KafkaInputOperator has to implement this method to emit tuples to an output port.
*
- * @param message
*/
protected abstract void emitTuple(Message message);
@@ -97,88 +205,179 @@
this.maxTuplesPerWindow = maxTuplesPerWindow;
}
- /**
- * Implement Component Interface.
- *
- * @param context
- */
@Override
public void setup(OperatorContext context)
{
+ if(!(getConsumer() instanceof SimpleKafkaConsumer) || !idempotent) {
+ idempotentStorageManager = new IdempotentStorageManager.NoopIdempotentStorageManager();
+ }
logger.debug("consumer {} topic {} cacheSize {}", consumer, consumer.getTopic(), consumer.getCacheSize());
consumer.create();
+ this.context = context;
+ operatorId = context.getId();
+ idempotentStorageManager.setup(context);
}
- /**
- * Implement Component Interface.
- */
@Override
public void teardown()
{
+ idempotentStorageManager.teardown();
consumer.teardown();
}
- /**
- * Implement Operator Interface.
- */
@Override
public void beginWindow(long windowId)
{
+ currentWindowId = windowId;
+ if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) {
+ replay(windowId);
+ }
emitCount = 0;
}
- /**
- * Implement Operator Interface.
- */
+ protected void replay(long windowId)
+ {
+ try {
+ @SuppressWarnings("unchecked")
+ Map<KafkaPartition, MutablePair<Long, Integer>> recoveredData = (Map<KafkaPartition, MutablePair<Long, Integer>>) idempotentStorageManager.load(operatorId, windowId);
+ if (recoveredData == null) {
+ return;
+ }
+
+ Map<String, List<PartitionMetadata>> pms = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, getConsumer().topic);
+ if (pms == null) {
+ return;
+ }
+
+ SimpleKafkaConsumer cons = (SimpleKafkaConsumer)getConsumer();
+ // add all partition request in one Fretch request together
+ FetchRequestBuilder frb = new FetchRequestBuilder().clientId(cons.getClientId());
+ for (Map.Entry<KafkaPartition, MutablePair<Long, Integer>> rc: recoveredData.entrySet()) {
+ KafkaPartition kp = rc.getKey();
+ List<PartitionMetadata> pmsVal = pms.get(kp.getClusterId());
+
+ Iterator<PartitionMetadata> pmIterator = pmsVal.iterator();
+ PartitionMetadata pm = pmIterator.next();
+ while (pm.partitionId() != kp.getPartitionId()) {
+ if (!pmIterator.hasNext())
+ break;
+ pm = pmIterator.next();
+ }
+ if (pm.partitionId() != kp.getPartitionId())
+ continue;
+
+ Broker bk = pm.leader();
+
+ frb.addFetch(consumer.topic, rc.getKey().getPartitionId(), rc.getValue().left, cons.getBufferSize());
+ FetchRequest req = frb.build();
+
+ SimpleConsumer ksc = new SimpleConsumer(bk.host(), bk.port(), cons.getTimeout(), cons.getBufferSize(), cons.getClientId());
+ FetchResponse fetchResponse = ksc.fetch(req);
+ Integer count = 0;
+ for (MessageAndOffset msg : fetchResponse.messageSet(consumer.topic, kp.getPartitionId())) {
+ emitTuple(msg.message());
+ offsetStats.put(kp, msg.offset());
+ count = count + 1;
+ if (count.equals(rc.getValue().right))
+ break;
+ }
+ }
+ if(windowId == idempotentStorageManager.getLargestRecoveryWindow()) {
+ // Set the offset positions to the consumer
+ Map<KafkaPartition, Long> currentOffsets = new HashMap<KafkaPartition, Long>(cons.getCurrentOffsets());
+ // Increment the offsets
+ for (Map.Entry<KafkaPartition, Long> e: offsetStats.entrySet()) {
+ currentOffsets.put(e.getKey(), e.getValue() + 1);
+ }
+
+ cons.resetOffset(currentOffsets);
+ cons.start();
+ }
+ }
+
+ catch (IOException e) {
+ throw new RuntimeException("replay", e);
+ }
+ }
+
@Override
public void endWindow()
{
+ if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) {
+ try {
+ if((getConsumer() instanceof SimpleKafkaConsumer)) {
+ SimpleKafkaConsumer cons = (SimpleKafkaConsumer) getConsumer();
+ context.setCounters(cons.getConsumerStats(offsetStats));
+ }
+ idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("saving recovery", e);
+ }
+ }
+ currentWindowRecoveryState.clear();
}
@Override
public void checkpointed(long windowId)
{
- // commit the kafka consumer offset
+ // commit the consumer offset
getConsumer().commitOffset();
}
@Override
public void committed(long windowId)
{
+ try {
+ idempotentStorageManager.deleteUpTo(operatorId, windowId);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("deleting state", e);
+ }
}
- /**
- * Implement ActivationListener Interface.
- */
@Override
public void activate(OperatorContext ctx)
{
+ if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < idempotentStorageManager.getLargestRecoveryWindow()) {
+ // If it is a replay state, don't start the consumer
+ return;
+ }
// Don't start thread here!
- // Because how many threads we want to start for kafka consumer depends on the type of kafka client and the message metadata(topic/partition/replica)
+ // # of kafka_consumer_threads depends on the type of kafka client and the message
+ // metadata(topic/partition/replica) layout
consumer.start();
}
- /**
- * Implement ActivationListener Interface.
- */
@Override
public void deactivate()
{
consumer.stop();
}
- /**
- * Implement InputOperator Interface.
- */
@Override
public void emitTuples()
{
+ if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) {
+ return;
+ }
int count = consumer.messageSize();
if (maxTuplesPerWindow > 0) {
count = Math.min(count, maxTuplesPerWindow - emitCount);
}
for (int i = 0; i < count; i++) {
- emitTuple(consumer.pollMessage());
+ KafkaConsumer.KafkaMessage message = consumer.pollMessage();
+ // Ignore the duplicate messages
+ if(offsetStats.containsKey(message.kafkaPart) && message.offSet <= offsetStats.get(message.kafkaPart))
+ continue;
+ emitTuple(message.msg);
+ offsetStats.put(message.kafkaPart, message.offSet);
+ MutablePair<Long, Integer> offsetAndCount = currentWindowRecoveryState.get(message.kafkaPart);
+ if(offsetAndCount == null) {
+ currentWindowRecoveryState.put(message.kafkaPart, new MutablePair<Long, Integer>(message.offSet, 1));
+ } else {
+ offsetAndCount.setRight(offsetAndCount.right+1);
+ }
}
emitCount += count;
}
@@ -193,20 +392,561 @@
return consumer;
}
- //add topic as operator property
+ // add topic as operator property
public void setTopic(String topic)
{
this.consumer.setTopic(topic);
}
- //add brokerlist as operator property
- public void setBrokerSet(String brokerString)
+ /**
+ * Set the ZooKeeper quorum of the Kafka cluster(s) you want to consume data from.
+ * The operator will discover the brokers that it needs to consume messages from.
+ */
+ public void setZookeeper(String zookeeperString)
{
- Set<String> brokerSet = new HashSet<String>();
- for (String broker : brokerString.split(",")) {
- brokerSet.add(broker);
+ SetMultimap<String, String> theClusters = HashMultimap.create();
+ for (String zk : zookeeperString.split(";")) {
+ String[] parts = zk.split("::");
+ String clusterId = parts.length == 1 ? KafkaPartition.DEFAULT_CLUSTERID : parts[0];
+ String[] hostNames = parts.length == 1 ? parts[0].split(",") : parts[1].split(",");
+ String portId = "";
+ for (int idx = hostNames.length - 1; idx >= 0; idx--) {
+ String[] zkParts = hostNames[idx].split(":");
+ if (zkParts.length == 2) {
+ portId = zkParts[1];
+ }
+ if (!portId.isEmpty() && portId != "") {
+ theClusters.put(clusterId, zkParts[0] + ":" + portId);
+ } else {
+ throw new IllegalArgumentException("Wrong zookeeper string: " + zookeeperString + "\n"
+ + " Expected format should be cluster1::zookeeper1,zookeeper2:port1;cluster2::zookeeper3:port2 or zookeeper1:port1,zookeeper:port2");
+ }
+ }
}
- this.consumer.setBrokerSet(brokerSet);
+ this.consumer.setZookeeper(theClusters);
}
+ @Override
+ public void partitioned(Map<Integer, Partitioner.Partition<AbstractKafkaInputOperator<K>>> partitions)
+ {
+ // update the last repartition time
+ lastRepartitionTime = System.currentTimeMillis();
+ }
+
+ @Override
+ public Collection<Partitioner.Partition<AbstractKafkaInputOperator<K>>> definePartitions(Collection<Partitioner.Partition<AbstractKafkaInputOperator<K>>> partitions, Partitioner.PartitioningContext context)
+ {
+ // Initialize brokers from zookeepers
+ getConsumer().initBrokers();
+
+ // check if it's the initial partition
+ boolean isInitialParitition = partitions.iterator().next().getStats() == null;
+
+ // get partition metadata for topics.
+ // Whatever operator is using high-level or simple kafka consumer, the operator always create a temporary simple kafka consumer to get the metadata of the topic
+ // The initial value of brokerList of the KafkaConsumer is used to retrieve the topic metadata
+ Map<String, List<PartitionMetadata>> kafkaPartitions = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, getConsumer().getTopic());
+
+ // Operator partitions
+ List<Partitioner.Partition<AbstractKafkaInputOperator<K>>> newPartitions = null;
+
+ // initialize the offset
+ Map<KafkaPartition, Long> initOffset = null;
+ if(isInitialParitition && offsetManager !=null){
+ initOffset = offsetManager.loadInitialOffsets();
+ logger.info("Initial offsets: {} ", "{ " + Joiner.on(", ").useForNull("").withKeyValueSeparator(": ").join(initOffset) + " }");
+ }
+
+ Collection<IdempotentStorageManager> newManagers = Sets.newHashSet();
+ Set<Integer> deletedOperators = Sets.newHashSet();
+
+ switch (strategy) {
+
+ // For the 1 to 1 mapping The framework will create number of operator partitions based on kafka topic partitions
+ // Each operator partition will consume from only one kafka partition
+ case ONE_TO_ONE:
+
+ if (isInitialParitition) {
+ lastRepartitionTime = System.currentTimeMillis();
+ logger.info("[ONE_TO_ONE]: Initializing partition(s)");
+
+ // initialize the number of operator partitions according to number of kafka partitions
+
+ newPartitions = new LinkedList<Partitioner.Partition<AbstractKafkaInputOperator<K>>>();
+ for (Map.Entry<String, List<PartitionMetadata>> kp : kafkaPartitions.entrySet()) {
+ String clusterId = kp.getKey();
+ for (PartitionMetadata pm : kp.getValue()) {
+ logger.info("[ONE_TO_ONE]: Create operator partition for cluster {}, topic {}, kafka partition {} ", clusterId, getConsumer().topic, pm.partitionId());
+ newPartitions.add(createPartition(Sets.newHashSet(new KafkaPartition(clusterId, consumer.topic, pm.partitionId())), initOffset, newManagers));
+ }
+ }
+
+ }
+ else if (newWaitingPartition.size() != 0) {
+ // add partition for new kafka partition
+ for (KafkaPartition newPartition : newWaitingPartition) {
+ logger.info("[ONE_TO_ONE]: Add operator partition for cluster {}, topic {}, partition {}", newPartition.getClusterId(), getConsumer().topic, newPartition.getPartitionId());
+ partitions.add(createPartition(Sets.newHashSet(newPartition), null, newManagers));
+ }
+ newWaitingPartition.clear();
+ idempotentStorageManager.partitioned(newManagers, deletedOperators);
+ return partitions;
+
+ }
+ break;
+ // For the 1 to N mapping The initial partition number is defined by stream application
+ // Afterwards, the framework will dynamically adjust the partition and allocate consumers to as less operator partitions as it can
+ // and guarantee the total intake rate for each operator partition is below some threshold
+ case ONE_TO_MANY:
+
+ if (getConsumer() instanceof HighlevelKafkaConsumer) {
+ throw new UnsupportedOperationException("[ONE_TO_MANY]: The high-level consumer is not supported for ONE_TO_MANY partition strategy.");
+ }
+
+ if (isInitialParitition) {
+ lastRepartitionTime = System.currentTimeMillis();
+ logger.info("[ONE_TO_MANY]: Initializing partition(s)");
+ int size = initialPartitionCount;
+ //Set<KafkaPartition>[] kps = new Set[size];
+ @SuppressWarnings("unchecked")
+ Set<KafkaPartition>[] kps = (Set<KafkaPartition>[]) Array.newInstance((new HashSet<KafkaPartition>()).getClass(), size);
+ newPartitions = new ArrayList<Partitioner.Partition<AbstractKafkaInputOperator<K>>>(size);
+ int i = 0;
+ for (Map.Entry<String, List<PartitionMetadata>> en : kafkaPartitions.entrySet()) {
+ String clusterId = en.getKey();
+ for (PartitionMetadata pm : en.getValue()) {
+ if (kps[i % size] == null) {
+ kps[i % size] = new HashSet<KafkaPartition>();
+ }
+ kps[i % size].add(new KafkaPartition(clusterId, consumer.topic, pm.partitionId()));
+ i++;
+ }
+ }
+ for (i = 0; i < kps.length; i++) {
+ logger.info("[ONE_TO_MANY]: Create operator partition for kafka partition(s): {} ", StringUtils.join(kps[i], ", "));
+ newPartitions.add(createPartition(kps[i], initOffset, newManagers));
+ }
+
+ }
+ else if (newWaitingPartition.size() != 0) {
+
+ logger.info("[ONE_TO_MANY]: Add operator partition for kafka partition(s): {} ", StringUtils.join(newWaitingPartition, ", "));
+ partitions.add(createPartition(Sets.newHashSet(newWaitingPartition), null, newManagers));
+ idempotentStorageManager.partitioned(newManagers, deletedOperators);
+ return partitions;
+ }
+ else {
+
+ logger.info("[ONE_TO_MANY]: Repartition the operator(s) under " + msgRateUpperBound + " msgs/s and " + byteRateUpperBound + " bytes/s hard limit");
+ // size of the list depends on the load and capacity of each operator
+ newPartitions = new LinkedList<Partitioner.Partition<AbstractKafkaInputOperator<K>>>();
+
+ // Use first-fit decreasing algorithm to minimize the container number and somewhat balance the partition
+ // try to balance the load and minimize the number of containers with each container's load under the threshold
+ // the partition based on the latest 1 minute moving average
+ Map<KafkaPartition, long[]> kPIntakeRate = new HashMap<KafkaPartition, long[]>();
+ // get the offset for all partitions of each consumer
+ Map<KafkaPartition, Long> offsetTrack = new HashMap<KafkaPartition, Long>();
+ for (Partitioner.Partition<AbstractKafkaInputOperator<K>> partition : partitions) {
+ List<Stats.OperatorStats> opss = partition.getStats().getLastWindowedStats();
+ if (opss == null || opss.size() == 0) {
+ continue;
+ }
+ offsetTrack.putAll(partition.getPartitionedInstance().consumer.getCurrentOffsets());
+ // Get the latest stats
+
+ Stats.OperatorStats stat = partition.getStats().getLastWindowedStats().get(partition.getStats().getLastWindowedStats().size() - 1);
+ if (stat.counters instanceof KafkaConsumer.KafkaMeterStats) {
+ KafkaConsumer.KafkaMeterStats kms = (KafkaConsumer.KafkaMeterStats) stat.counters;
+ kPIntakeRate.putAll(get_1minMovingAvgParMap(kms));
+ }
+ }
+
+ List<PartitionInfo> partitionInfos = firstFitDecreasingAlgo(kPIntakeRate);
+
+ // Add the existing partition Ids to the deleted operators
+ for(Partitioner.Partition<AbstractKafkaInputOperator<K>> op : partitions)
+ {
+ deletedOperators.add(op.getPartitionedInstance().operatorId);
+ }
+ for (PartitionInfo r : partitionInfos) {
+ logger.info("[ONE_TO_MANY]: Create operator partition for kafka partition(s): " + StringUtils.join(r.kpids, ", ") + ", topic: " + this.getConsumer().topic);
+ newPartitions.add(createPartition(r.kpids, offsetTrack, newManagers));
+ }
+ currentPartitionInfo.addAll(partitionInfos);
+ }
+ break;
+
+ case ONE_TO_MANY_HEURISTIC:
+ throw new UnsupportedOperationException("[ONE_TO_MANY_HEURISTIC]: Not implemented yet");
+ default:
+ break;
+ }
+
+ idempotentStorageManager.partitioned(newManagers, deletedOperators);
+ return newPartitions;
+ }
+
+ // Create a new partition with the partition Ids and initial offset positions
+ protected Partitioner.Partition<AbstractKafkaInputOperator<K>> createPartition(Set<KafkaPartition> pIds, Map<KafkaPartition, Long> initOffsets, Collection<IdempotentStorageManager> newManagers)
+ {
+ Kryo kryo = new Kryo();
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ Output output = new Output(bos);
+ kryo.writeObject(output, this);
+ output.close();
+ Input lInput = new Input(bos.toByteArray());
+ @SuppressWarnings("unchecked")
+ Partitioner.Partition<AbstractKafkaInputOperator<K>> p = new DefaultPartition<AbstractKafkaInputOperator<K>>(kryo.readObject(lInput, this.getClass()));
+ p.getPartitionedInstance().getConsumer().resetPartitionsAndOffset(pIds, initOffsets);
+ newManagers.add(p.getPartitionedInstance().idempotentStorageManager);
+
+ PartitionInfo pif = new PartitionInfo();
+ pif.kpids = pIds;
+ currentPartitionInfo.add(pif);
+ return p;
+ }
+
+ private List<PartitionInfo> firstFitDecreasingAlgo(final Map<KafkaPartition, long[]> kPIntakeRate)
+ {
+ // (Decreasing) Sort the map by msgs/s and bytes/s in descending order
+ List<Map.Entry<KafkaPartition, long[]>> sortedMapEntry = new LinkedList<Map.Entry<KafkaPartition, long[]>>(kPIntakeRate.entrySet());
+ Collections.sort(sortedMapEntry, new Comparator<Map.Entry<KafkaPartition, long[]>>()
+ {
+ @Override
+ public int compare(Map.Entry<KafkaPartition, long[]> firstEntry, Map.Entry<KafkaPartition, long[]> secondEntry)
+ {
+ long[] firstPair = firstEntry.getValue();
+ long[] secondPair = secondEntry.getValue();
+ if (msgRateUpperBound == Long.MAX_VALUE || firstPair[0] == secondPair[0]) {
+ return (int) (secondPair[1] - firstPair[1]);
+ } else {
+ return (int) (secondPair[0] - firstPair[0]);
+ }
+ }
+ });
+
+ // (First-fit) Look for first fit operator to assign the consumer
+ // Go over all the kafka partitions and look for the right operator to assign to
+ // Each record has a set of kafka partition ids and the resource left for that operator after assigned the consumers for those partitions
+ List<PartitionInfo> pif = new LinkedList<PartitionInfo>();
+ outer:
+ for (Map.Entry<KafkaPartition, long[]> entry : sortedMapEntry) {
+ long[] resourceRequired = entry.getValue();
+ for (PartitionInfo r : pif) {
+ if (r.msgRateLeft > resourceRequired[0] && r.byteRateLeft > resourceRequired[1]) {
+ // found first fit operator partition that has enough resource for this consumer
+ // add consumer to the operator partition
+ r.kpids.add(entry.getKey());
+ // update the resource left in this partition
+ r.msgRateLeft -= r.msgRateLeft == Long.MAX_VALUE ? 0 : resourceRequired[0];
+ r.byteRateLeft -= r.byteRateLeft == Long.MAX_VALUE ? 0 : resourceRequired[1];
+ continue outer;
+ }
+ }
+ // didn't find the existing "operator" to assign this consumer
+ PartitionInfo nr = new PartitionInfo();
+ nr.kpids = Sets.newHashSet(entry.getKey());
+ nr.msgRateLeft = msgRateUpperBound == Long.MAX_VALUE ? msgRateUpperBound : msgRateUpperBound - resourceRequired[0];
+ nr.byteRateLeft = byteRateUpperBound == Long.MAX_VALUE ? byteRateUpperBound : byteRateUpperBound - resourceRequired[1];
+ pif.add(nr);
+ }
+
+ return pif;
+ }
+
+ @Override
+ public StatsListener.Response processStats(StatsListener.BatchedOperatorStats stats)
+ {
+ StatsListener.Response resp = new StatsListener.Response();
+ List<KafkaConsumer.KafkaMeterStats> kstats = extractKafkaStats(stats);
+ resp.repartitionRequired = isPartitionRequired(stats.getOperatorId(), kstats);
+ return resp;
+ }
+
+ private void updateOffsets(List<KafkaConsumer.KafkaMeterStats> kstats)
+ {
+ //In every partition check interval, call offsetmanager to update the offsets
+ if (offsetManager != null) {
+ offsetManager.updateOffsets(getOffsetsForPartitions(kstats));
+ }
+ }
+
+ private List<KafkaConsumer.KafkaMeterStats> extractKafkaStats(StatsListener.BatchedOperatorStats stats)
+ {
+ //preprocess the stats
+ List<KafkaConsumer.KafkaMeterStats> kmsList = new LinkedList<KafkaConsumer.KafkaMeterStats>();
+ for (Stats.OperatorStats os : stats.getLastWindowedStats()) {
+ if (os != null && os.counters instanceof KafkaConsumer.KafkaMeterStats) {
+ kmsList.add((KafkaConsumer.KafkaMeterStats) os.counters);
+ }
+ }
+ return kmsList;
+ }
+
+ /**
+ *
+ * Check whether the operator needs repartition based on reported stats
+ *
+ * @return true if repartition is required
+ * false if repartition is not required
+ */
+ private boolean isPartitionRequired(int opid, List<KafkaConsumer.KafkaMeterStats> kstats)
+ {
+
+ long t = System.currentTimeMillis();
+
+ if (t - lastCheckTime < repartitionCheckInterval) {
+ // return false if it's within repartitionCheckInterval since last time it check the stats
+ return false;
+ }
+
+ logger.debug("Use OffsetManager to update offsets");
+ updateOffsets(kstats);
+
+
+ if(repartitionInterval < 0){
+ // if repartition is disabled
+ return false;
+ }
+
+ if(t - lastRepartitionTime < repartitionInterval) {
+ // return false if it's still within repartitionInterval since last (re)partition
+ return false;
+ }
+
+
+ kafkaStatsHolder.put(opid, kstats);
+
+ if (kafkaStatsHolder.size() != currentPartitionInfo.size() || currentPartitionInfo.size() == 0) {
+ // skip checking if the operator hasn't collected all the stats from all the current partitions
+ return false;
+ }
+
+ try {
+
+ // monitor if new kafka partition added
+ {
+ Set<KafkaPartition> existingIds = new HashSet<KafkaPartition>();
+ for (PartitionInfo pio : currentPartitionInfo) {
+ existingIds.addAll(pio.kpids);
+ }
+
+ for (Map.Entry<String, List<PartitionMetadata>> en : KafkaMetadataUtil.getPartitionsForTopic(consumer.brokers, consumer.getTopic()).entrySet()) {
+ for (PartitionMetadata pm : en.getValue()) {
+ KafkaPartition pa = new KafkaPartition(en.getKey(), consumer.topic, pm.partitionId());
+ if(!existingIds.contains(pa)){
+ newWaitingPartition.add(pa);
+ }
+ }
+ }
+ if (newWaitingPartition.size() != 0) {
+ // found new kafka partition
+ lastRepartitionTime = t;
+ return true;
+ }
+ }
+
+ if (strategy == PartitionStrategy.ONE_TO_ONE) {
+ return false;
+ }
+
+ // This is expensive part and only every repartitionCheckInterval it will check existing the overall partitions
+ // and see if there is more optimal solution
+ // The decision is made by 2 constraint
+ // Hard constraint which is upper bound overall msgs/s or bytes/s
+ // Soft constraint which is more optimal solution
+
+ boolean b = breakHardConstraint(kstats) || breakSoftConstraint();
+ if (b) {
+ currentPartitionInfo.clear();
+ kafkaStatsHolder.clear();
+ }
+ return b;
+ } finally {
+ // update last check time
+ lastCheckTime = System.currentTimeMillis();
+ }
+ }
+
+ /**
+ * Check to see if there is other more optimal(less partition) partition assignment based on current statistics
+ *
+ * @return True if all windowed stats indicate different partition size we need to adjust the partition.
+ */
+ private boolean breakSoftConstraint()
+ {
+ if (kafkaStatsHolder.size() != currentPartitionInfo.size()) {
+ return false;
+ }
+ int length = kafkaStatsHolder.get(kafkaStatsHolder.keySet().iterator().next()).size();
+ for (int j = 0; j < length; j++) {
+ Map<KafkaPartition, long[]> kPIntakeRate = new HashMap<KafkaPartition, long[]>();
+ for (Integer pid : kafkaStatsHolder.keySet()) {
+ if(kafkaStatsHolder.get(pid).size() <= j)
+ continue;
+ kPIntakeRate.putAll(get_1minMovingAvgParMap(kafkaStatsHolder.get(pid).get(j)));
+ }
+ if (kPIntakeRate.size() == 0) {
+ return false;
+ }
+ List<PartitionInfo> partitionInfo = firstFitDecreasingAlgo(kPIntakeRate);
+ if (partitionInfo.size() == 0 || partitionInfo.size() == currentPartitionInfo.size()) {
+ return false;
+ }
+ }
+ // if all windowed stats indicate different partition size we need to adjust the partition
+ return true;
+ }
+
+ /**
+ * Check if all the statistics within the windows break the upper bound hard limit in msgs/s or bytes/s
+ *
+ * @return True if all the statistics within the windows break the upper bound hard limit in msgs/s or bytes/s.
+ */
+ private boolean breakHardConstraint(List<KafkaConsumer.KafkaMeterStats> kmss)
+ {
+ // Only care about the KafkaMeterStats
+
+ // if there is no kafka meter stats at all, don't repartition
+ if (kmss == null || kmss.size() == 0) {
+ return false;
+ }
+ // if all the stats within the window have msgs/s above the upper bound threshold (hard limit)
+ boolean needRP = Iterators.all(kmss.iterator(), new Predicate<KafkaConsumer.KafkaMeterStats>()
+ {
+ @Override
+ public boolean apply(KafkaConsumer.KafkaMeterStats kms)
+ {
+ // If there are more than 1 kafka partition and the total msg/s reach the limit
+ return kms.partitionStats.size() > 1 && kms.totalMsgPerSec > msgRateUpperBound;
+ }
+ });
+
+ // or all the stats within the window have bytes/s above the upper bound threshold (hard limit)
+ needRP = needRP || Iterators.all(kmss.iterator(), new Predicate<KafkaConsumer.KafkaMeterStats>()
+ {
+ @Override
+ public boolean apply(KafkaConsumer.KafkaMeterStats kms)
+ {
+ //If there are more than 1 kafka partition and the total bytes/s reach the limit
+ return kms.partitionStats.size() > 1 && kms.totalBytesPerSec > byteRateUpperBound;
+ }
+ });
+
+ return needRP;
+
+ }
+
+ public static enum PartitionStrategy
+ {
+ /**
+ * Each operator partition connect to only one kafka partition
+ */
+ ONE_TO_ONE,
+ /**
+ * Each operator consumes from several kafka partitions with overall input rate under some certain hard limit in msgs/s or bytes/s
+ * For now it <b>only</b> support <b>simple kafka consumer</b>
+ */
+ ONE_TO_MANY,
+ /**
+ * 1 to N partition based on the heuristic function
+ * <b>NOT</b> implemented yet
+ * TODO implement this later
+ */
+ ONE_TO_MANY_HEURISTIC
+ }
+
+ static class PartitionInfo
+ {
+ Set<KafkaPartition> kpids;
+ long msgRateLeft;
+ long byteRateLeft;
+ }
+
+ public IdempotentStorageManager getIdempotentStorageManager()
+ {
+ return idempotentStorageManager;
+ }
+
+ public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager)
+ {
+ this.idempotentStorageManager = idempotentStorageManager;
+ }
+
+ public void setInitialPartitionCount(int partitionCount)
+ {
+ this.initialPartitionCount = partitionCount;
+ }
+
+ public int getInitialPartitionCount()
+ {
+ return initialPartitionCount;
+ }
+
+ public long getMsgRateUpperBound()
+ {
+ return msgRateUpperBound;
+ }
+
+ public void setMsgRateUpperBound(long msgRateUpperBound)
+ {
+ this.msgRateUpperBound = msgRateUpperBound;
+ }
+
+ public long getByteRateUpperBound()
+ {
+ return byteRateUpperBound;
+ }
+
+ public void setByteRateUpperBound(long byteRateUpperBound)
+ {
+ this.byteRateUpperBound = byteRateUpperBound;
+ }
+
+ public void setInitialOffset(String initialOffset)
+ {
+ this.consumer.initialOffset = initialOffset;
+ }
+
+ public void setOffsetManager(OffsetManager offsetManager)
+ {
+ this.offsetManager = offsetManager;
+ }
+
+ public void setRepartitionCheckInterval(long repartitionCheckInterval)
+ {
+ this.repartitionCheckInterval = repartitionCheckInterval;
+ }
+
+ public long getRepartitionCheckInterval()
+ {
+ return repartitionCheckInterval;
+ }
+
+ public void setRepartitionInterval(long repartitionInterval)
+ {
+ this.repartitionInterval = repartitionInterval;
+ }
+
+ public long getRepartitionInterval()
+ {
+ return repartitionInterval;
+ }
+
+ //@Pattern(regexp="ONE_TO_ONE|ONE_TO_MANY|ONE_TO_MANY_HEURISTIC", flags={Flag.CASE_INSENSITIVE})
+ public void setStrategy(String policy)
+ {
+ this.strategy = PartitionStrategy.valueOf(policy.toUpperCase());
+ }
+
+ public boolean isIdempotent()
+ {
+ return idempotent;
+ }
+
+ public void setIdempotent(boolean idempotent)
+ {
+ this.idempotent = idempotent;
+ }
}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
index 7e1b252..ecca9d3 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
@@ -96,7 +96,7 @@
configProperties.putAll(prop);
return new ProducerConfig(configProperties);
- };
+ }
public Producer<K, V> getProducer()
{
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractPartitionableKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractPartitionableKafkaInputOperator.java
deleted file mode 100644
index 6509b79..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractPartitionableKafkaInputOperator.java
+++ /dev/null
@@ -1,662 +0,0 @@
-/*
- * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.kafka;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import kafka.javaapi.PartitionMetadata;
-
-import javax.validation.constraints.Min;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Sets;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.commons.lang3.StringUtils;
-
-import com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStats;
-
-import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.*;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultPartition;
-import com.datatorrent.api.Partitioner;
-import com.datatorrent.api.Stats.OperatorStats;
-import com.datatorrent.api.StatsListener;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-
-/**
- * This is a base implementation of a Kafka input operator, which consumes data from Kafka message bus.
- * It will be dynamically partitioned based on the upstream kafka partition.
- * <p>
- * <b>Partition Strategy:</b>
- * <p><b>1. ONE_TO_ONE partition</b> Each operator partition will consume from only one kafka partition </p>
- * <p><b>2. ONE_TO_MANY partition</b> Each operator partition consumer from multiple kafka partition with some hard ingestion rate limit</p>
- * <p><b>3. ONE_TO_MANY_HEURISTIC partition</b>(Not implemented yet) Each operator partition consumer from multiple kafka partition and partition number depends on heuristic function(real time bottle neck)</p>
- * <p><b>Note:</b> ONE_TO_MANY partition only support simple kafka consumer because
- * <p> 1) high-level consumer can only balance the number of brokers it consumes from rather than the actual load from each broker</p>
- * <p> 2) high-level consumer can not reset offset once it's committed so the tuples are not replayable </p>
- * <p></p>
- * <br>
- * <br>
- * <b>Basic Algorithm:</b>
- * <p>1.Pull the metadata(how many partitions) of the topic from brokerList of {@link KafkaConsumer}</p>
- * <p>2.cloneConsumer method is used to initialize the new {@link KafkaConsumer} instance for the new partition operator</p>
- * <p>3.cloneOperator method is used to initialize the new {@link AbstractPartitionableKafkaInputOperator} instance for the new partition operator</p>
- * <p>4.ONE_TO_MANY partition use first-fit decreasing algorithm(http://en.wikipedia.org/wiki/Bin_packing_problem) to minimize the partition operator
- * <br>
- * <br>
- * <b>Load balance:</b> refer to {@link SimpleKafkaConsumer} and {@link HighlevelKafkaConsumer} <br>
- * <b>Kafka partition failover:</b> refer to {@link SimpleKafkaConsumer} and {@link HighlevelKafkaConsumer}
- * <br>
- * <br>
- * <b>Self adjust to Kafka partition change:</b>
- * <p><b>EACH</b> operator partition periodically check the leader broker(s) change which it consumes from and adjust connection without repartition</p>
- * <p><b>ONLY APPMASTER</b> operator periodically check overall kafka partition layout and add operator partition due to kafka partition add(no delete supported by kafka for now)</p>
- * <br>
- * <br>
- * </p>
- *
- * @displayName Abstract Partitionable Kafka Input
- * @category Messaging
- * @tags input operator
- *
- * @since 0.9.0
- */
-@OperatorAnnotation(partitionable = true)
-public abstract class AbstractPartitionableKafkaInputOperator extends AbstractKafkaInputOperator<KafkaConsumer> implements Partitioner<AbstractPartitionableKafkaInputOperator>, StatsListener
-{
-
- // By default the partition policy is 1:1
- public PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE;
-
- private transient OperatorContext context = null;
-
- // default resource is unlimited in terms of msgs per second
- private long msgRateUpperBound = Long.MAX_VALUE;
-
- // default resource is unlimited in terms of bytes per second
- private long byteRateUpperBound = Long.MAX_VALUE;
-
- private static final Logger logger = LoggerFactory.getLogger(AbstractPartitionableKafkaInputOperator.class);
-
- // Store the current partition topology
- private transient List<PartitionInfo> currentPartitionInfo = new LinkedList<AbstractPartitionableKafkaInputOperator.PartitionInfo>();
-
- // Store the current collected kafka consumer stats
- private transient Map<Integer, List<KafkaMeterStats>> kafkaStatsHolder = new HashMap<Integer, List<KafkaConsumer.KafkaMeterStats>>();
-
- private OffsetManager offsetManager = null;
-
- // Minimal interval between 2 (re)partition actions
- private long repartitionInterval = 30000L;
-
- // Minimal interval between checking collected stats and decide whether it needs to repartition or not.
- // And minimal interval between 2 offset updates
- private long repartitionCheckInterval = 5000L;
-
- private transient long lastCheckTime = 0L;
-
- private transient long lastRepartitionTime = 0L;
-
- private transient List<Integer> newWaitingPartition = new LinkedList<Integer>();
-
- @Min(1)
- private int initialPartitionCount = 1;
-
- @Override
- public void partitioned(Map<Integer, Partition<AbstractPartitionableKafkaInputOperator>> partitions)
- {
- // update the last repartition time
- lastRepartitionTime = System.currentTimeMillis();
- }
-
- @Override
- public Collection<Partition<AbstractPartitionableKafkaInputOperator>> definePartitions(Collection<Partition<AbstractPartitionableKafkaInputOperator>> partitions, PartitioningContext context)
- {
-
- // check if it's the initial partition
- boolean isInitialParitition = partitions.iterator().next().getStats() == null;
-
- // get partition metadata for topics.
- // Whatever operator is using high-level or simple kafka consumer, the operator always create a temporary simple kafka consumer to get the metadata of the topic
- // The initial value of brokerList of the KafkaConsumer is used to retrieve the topic metadata
- List<PartitionMetadata> kafkaPartitionList = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().getBrokerSet(), getConsumer().getTopic());
-
- // Operator partitions
- List<Partition<AbstractPartitionableKafkaInputOperator>> newPartitions = null;
-
- // initialize the offset
- Map<Integer, Long> initOffset = null;
- if(isInitialParitition && offsetManager !=null){
- initOffset = offsetManager.loadInitialOffsets();
- logger.info("Initial offsets: {} ", "{ " + Joiner.on(", ").useForNull("").withKeyValueSeparator(": ").join(initOffset) + " }");
- }
-
- switch (strategy) {
-
- // For the 1 to 1 mapping The framework will create number of operator partitions based on kafka topic partitions
- // Each operator partition will consume from only one kafka partition
- case ONE_TO_ONE:
-
- if (isInitialParitition) {
- lastRepartitionTime = System.currentTimeMillis();
- logger.info("[ONE_TO_ONE]: Initializing partition(s)");
-
- // initialize the number of operator partitions according to number of kafka partitions
-
- newPartitions = new ArrayList<Partition<AbstractPartitionableKafkaInputOperator>>(kafkaPartitionList.size());
- for (int i = 0; i < kafkaPartitionList.size(); i++) {
- logger.info("[ONE_TO_ONE]: Create operator partition for kafka partition: " + kafkaPartitionList.get(i).partitionId() + ", topic: " + this.getConsumer().topic);
- Partition<AbstractPartitionableKafkaInputOperator> p = new DefaultPartition<AbstractPartitionableKafkaInputOperator>(cloneOperator());
- PartitionMetadata pm = kafkaPartitionList.get(i);
- KafkaConsumer newConsumerForPartition = getConsumer().cloneConsumer(Sets.newHashSet(pm.partitionId()), initOffset);
- p.getPartitionedInstance().setConsumer(newConsumerForPartition);
- PartitionInfo pif = new PartitionInfo();
- pif.kpids = Sets.newHashSet(pm.partitionId());
- currentPartitionInfo.add(pif);
- newPartitions.add(p);
- }
- }
- else if (newWaitingPartition.size() != 0) {
- // add partition for new kafka partition
- for (int pid : newWaitingPartition) {
- logger.info("[ONE_TO_ONE]: Add operator partition for kafka partition " + pid);
- Partition<AbstractPartitionableKafkaInputOperator> p = new DefaultPartition<AbstractPartitionableKafkaInputOperator>(cloneOperator());
- KafkaConsumer newConsumerForPartition = getConsumer().cloneConsumer(Sets.newHashSet(pid));
- p.getPartitionedInstance().setConsumer(newConsumerForPartition);
- PartitionInfo pif = new PartitionInfo();
- pif.kpids = Sets.newHashSet(pid);
- currentPartitionInfo.add(pif);
- partitions.add(p);
- }
- newWaitingPartition.clear();
- return partitions;
-
- }
- break;
- // For the 1 to N mapping The initial partition number is defined by stream application
- // Afterwards, the framework will dynamically adjust the partition and allocate consumers to as less operator partitions as it can
- // and guarantee the total intake rate for each operator partition is below some threshold
- case ONE_TO_MANY:
-
- if (getConsumer() instanceof HighlevelKafkaConsumer) {
- throw new UnsupportedOperationException("[ONE_TO_MANY]: The high-level consumer is not supported for ONE_TO_MANY partition strategy.");
- }
-
- if (isInitialParitition) {
- lastRepartitionTime = System.currentTimeMillis();
- logger.info("[ONE_TO_MANY]: Initializing partition(s)");
- int size = initialPartitionCount;
- @SuppressWarnings("unchecked")
- Set<Integer>[] pIds = new Set[size];
- newPartitions = new ArrayList<Partition<AbstractPartitionableKafkaInputOperator>>(size);
- for (int i = 0; i < kafkaPartitionList.size(); i++) {
- PartitionMetadata pm = kafkaPartitionList.get(i);
- if (pIds[i % size] == null) {
- pIds[i % size] = new HashSet<Integer>();
- }
- pIds[i % size].add(pm.partitionId());
- }
- for (int i = 0; i < pIds.length; i++) {
- logger.info("[ONE_TO_MANY]: Create operator partition for kafka partition(s): " + StringUtils.join(pIds[i], ", ") + ", topic: " + this.getConsumer().topic);
- Partition<AbstractPartitionableKafkaInputOperator> p = new DefaultPartition<AbstractPartitionableKafkaInputOperator>(_cloneOperator());
- KafkaConsumer newConsumerForPartition = getConsumer().cloneConsumer(pIds[i], initOffset);
- p.getPartitionedInstance().setConsumer(newConsumerForPartition);
- newPartitions.add(p);
- PartitionInfo pif = new PartitionInfo();
- pif.kpids = pIds[i];
- currentPartitionInfo.add(pif);
- }
-
- }
- else if (newWaitingPartition.size() != 0) {
-
- logger.info("[ONE_TO_MANY]: Add operator partition for kafka partition(s): " + StringUtils.join(newWaitingPartition, ", ") + ", topic: " + this.getConsumer().topic);
- Partition<AbstractPartitionableKafkaInputOperator> p = new DefaultPartition<AbstractPartitionableKafkaInputOperator>(_cloneOperator());
- KafkaConsumer newConsumerForPartition = getConsumer().cloneConsumer(Sets.newHashSet(newWaitingPartition));
- p.getPartitionedInstance().setConsumer(newConsumerForPartition);
- partitions.add(p);
- PartitionInfo pif = new PartitionInfo();
- pif.kpids = Sets.newHashSet(newWaitingPartition);
- currentPartitionInfo.add(pif);
- newWaitingPartition.clear();
- return partitions;
- }
- else {
-
- logger.info("[ONE_TO_MANY]: Repartition the operator(s) under " + msgRateUpperBound + " msgs/s and " + byteRateUpperBound + " bytes/s hard limit");
- // size of the list depends on the load and capacity of each operator
- newPartitions = new LinkedList<Partition<AbstractPartitionableKafkaInputOperator>>();
-
- // Use first-fit decreasing algorithm to minimize the container number and somewhat balance the partition
- // try to balance the load and minimize the number of containers with each container's load under the threshold
- // the partition based on the latest 1 minute moving average
- Map<Integer, long[]> kPIntakeRate = new HashMap<Integer, long[]>();
- // get the offset for all partitions of each consumer
- Map<Integer, Long> offsetTrack = new HashMap<Integer, Long>();
- for (Partition<AbstractPartitionableKafkaInputOperator> partition : partitions) {
- List<OperatorStats> opss = partition.getStats().getLastWindowedStats();
- if (opss == null || opss.size() == 0) {
- continue;
- }
- offsetTrack.putAll(partition.getPartitionedInstance().consumer.getCurrentOffsets());
- // Get the latest stats
-
- OperatorStats stat = partition.getStats().getLastWindowedStats().get(partition.getStats().getLastWindowedStats().size() - 1);
- if (stat.counters instanceof KafkaMeterStats) {
- KafkaMeterStats kms = (KafkaMeterStats) stat.counters;
- kPIntakeRate.putAll(get_1minMovingAvgParMap(kms));
- }
- }
-
- List<PartitionInfo> partitionInfos = firstFitDecreasingAlgo(kPIntakeRate);
-
- for (PartitionInfo r : partitionInfos) {
- logger.info("[ONE_TO_MANY]: Create operator partition for kafka partition(s): " + StringUtils.join(r.kpids, ", ") + ", topic: " + this.getConsumer().topic);
- Partition<AbstractPartitionableKafkaInputOperator> p = new DefaultPartition<AbstractPartitionableKafkaInputOperator>(_cloneOperator());
- KafkaConsumer newConsumerForPartition = getConsumer().cloneConsumer(r.kpids, offsetTrack);
- p.getPartitionedInstance().setConsumer(newConsumerForPartition);
- newPartitions.add(p);
- }
-
- currentPartitionInfo.addAll(partitionInfos);
- }
-
- break;
-
- case ONE_TO_MANY_HEURISTIC:
- throw new UnsupportedOperationException("[ONE_TO_MANY_HEURISTIC]: Not implemented yet");
- default:
- break;
- }
-
- return newPartitions;
- }
-
- private List<PartitionInfo> firstFitDecreasingAlgo(final Map<Integer, long[]> kPIntakeRate)
- {
- // (Decreasing) Sort the map by msgs/s and bytes/s in descending order
- List<Entry<Integer, long[]>> sortedMapEntry = new LinkedList<Entry<Integer, long[]>>(kPIntakeRate.entrySet());
- Collections.sort(sortedMapEntry, new Comparator<Entry<Integer, long[]>>()
- {
- @Override
- public int compare(Entry<Integer, long[]> firstEntry, Entry<Integer, long[]> secondEntry)
- {
- long[] firstPair = firstEntry.getValue();
- long[] secondPair = secondEntry.getValue();
- if (msgRateUpperBound == Long.MAX_VALUE || firstPair[0] == secondPair[0]) {
- return (int) (secondPair[1] - firstPair[1]);
- }
- else {
- return (int) (secondPair[0] - firstPair[0]);
- }
- }
- });
-
- // (First-fit) Look for first fit operator to assign the consumer
- // Go over all the kafka partitions and look for the right operator to assign to
- // Each record has a set of kafka partition ids and the resource left for that operator after assigned the consumers for those partitions
- List<PartitionInfo> pif = new LinkedList<PartitionInfo>();
- outer:
- for (Entry<Integer, long[]> entry : sortedMapEntry) {
- long[] resourceRequired = entry.getValue();
- for (PartitionInfo r : pif) {
- if (r.msgRateLeft > resourceRequired[0] && r.byteRateLeft > resourceRequired[1]) {
- // found first fit operator partition that has enough resource for this consumer
- // add consumer to the operator partition
- r.kpids.add(entry.getKey());
- // update the resource left in this partition
- r.msgRateLeft -= r.msgRateLeft == Long.MAX_VALUE ? 0 : resourceRequired[0];
- r.byteRateLeft -= r.byteRateLeft == Long.MAX_VALUE ? 0 : resourceRequired[1];
- continue outer;
- }
- }
- // didn't find the existing "operator" to assign this consumer
- PartitionInfo nr = new PartitionInfo();
- nr.kpids = Sets.newHashSet(entry.getKey());
- nr.msgRateLeft = msgRateUpperBound == Long.MAX_VALUE ? msgRateUpperBound : msgRateUpperBound - resourceRequired[0];
- nr.byteRateLeft = byteRateUpperBound == Long.MAX_VALUE ? byteRateUpperBound : byteRateUpperBound - resourceRequired[1];
- pif.add(nr);
- }
-
- return pif;
- }
-
- @Override
- public Response processStats(BatchedOperatorStats stats)
- {
-
- Response resp = new Response();
- List<KafkaMeterStats> kstats = extractKafkaStats(stats);
- resp.repartitionRequired = needPartition(stats.getOperatorId(), kstats);
- return resp;
- }
-
- private void updateOffsets(List<KafkaMeterStats> kstats)
- {
- //In every partition check interval, call offsetmanager to update the offsets
- if (offsetManager != null) {
- offsetManager.updateOffsets(getOffsetsForPartitions(kstats));
- }
- }
-
- private List<KafkaMeterStats> extractKafkaStats(BatchedOperatorStats stats)
- {
- //preprocess the stats
- List<KafkaMeterStats> kmsList = new LinkedList<KafkaConsumer.KafkaMeterStats>();
- for (OperatorStats os : stats.getLastWindowedStats()) {
- if (os != null && os.counters instanceof KafkaMeterStats) {
- kmsList.add((KafkaMeterStats) os.counters);
- }
- }
- return kmsList;
- }
-
- /**
- *
- * Check whether the operator needs repartition based on reported stats
- *
- * @param stats
- * @return true if repartition is required
- * false if repartition is not required
- */
- private boolean needPartition(int opid, List<KafkaMeterStats> kstats)
- {
-
- long t = System.currentTimeMillis();
-
- if (t - lastCheckTime < repartitionCheckInterval) {
- // return false if it's within repartitionCheckInterval since last time it check the stats
- return false;
- }
-
- logger.debug("Use OffsetManager to update offsets");
- updateOffsets(kstats);
-
-
- if(repartitionInterval < 0){
- // if repartition is disabled
- return false;
- }
-
- if(t - lastRepartitionTime < repartitionInterval) {
- // return false if it's still within repartitionInterval since last (re)partition
- return false;
- }
-
-
- kafkaStatsHolder.put(opid, kstats);
-
- if (kafkaStatsHolder.size() != currentPartitionInfo.size() || currentPartitionInfo.size() == 0) {
- // skip checking if the operator hasn't collected all the stats from all the current partitions
- return false;
- }
-
- try {
-
- // monitor if new kafka partition added
- {
- Set<Integer> existingIds = new HashSet<Integer>();
- for (PartitionInfo pio : currentPartitionInfo) {
- existingIds.addAll(pio.kpids);
- }
-
- for (PartitionMetadata metadata : KafkaMetadataUtil.getPartitionsForTopic(consumer.brokerSet, consumer.getTopic())) {
- if (!existingIds.contains(metadata.partitionId())) {
- newWaitingPartition.add(metadata.partitionId());
- }
- }
- if (newWaitingPartition.size() != 0) {
- // found new kafka partition
- lastRepartitionTime = t;
- return true;
- }
- }
-
- if (strategy == PartitionStrategy.ONE_TO_ONE) {
- return false;
- }
-
- // This is expensive part and only every repartitionCheckInterval it will check existing the overall partitions
- // and see if there is more optimal solution
- // The decision is made by 2 constraint
- // Hard constraint which is upper bound overall msgs/s or bytes/s
- // Soft constraint which is more optimal solution
-
- boolean b = breakHardConstraint(kstats) || breakSoftConstraint();
- if (b) {
- currentPartitionInfo.clear();
- kafkaStatsHolder.clear();
- }
- return b;
- } finally {
- // update last check time
- lastCheckTime = System.currentTimeMillis();
- }
- }
-
- /**
- * Check to see if there is other more optimal(less partition) partition assignment based on current statistics
- *
- * @return True if all windowed stats indicate different partition size we need to adjust the partition.
- */
- private boolean breakSoftConstraint()
- {
- if (kafkaStatsHolder.size() != currentPartitionInfo.size()) {
- return false;
- }
- int length = kafkaStatsHolder.get(kafkaStatsHolder.keySet().iterator().next()).size();
- for (int j = 0; j < length; j++) {
- Map<Integer, long[]> kPIntakeRate = new HashMap<Integer, long[]>();
- for (Integer pid : kafkaStatsHolder.keySet()) {
- kPIntakeRate.putAll(get_1minMovingAvgParMap(kafkaStatsHolder.get(pid).get(j)));
- }
- if (kPIntakeRate.size() == 0) {
- return false;
- }
- List<PartitionInfo> partitionInfo = firstFitDecreasingAlgo(kPIntakeRate);
- if (partitionInfo.size() == 0 || partitionInfo.size() == currentPartitionInfo.size()) {
- return false;
- }
- }
- // if all windowed stats indicate different partition size we need to adjust the partition
- return true;
- }
-
- /**
- * Check if all the statistics within the windows break the upper bound hard limit in msgs/s or bytes/s
- *
- * @param kmss
- * @return True if all the statistics within the windows break the upper bound hard limit in msgs/s or bytes/s.
- */
- private boolean breakHardConstraint(List<KafkaMeterStats> kmss)
- {
- // Only care about the KafkaMeterStats
-
- // if there is no kafka meter stats at all, don't repartition
- if (kmss == null || kmss.size() == 0) {
- return false;
- }
- // if all the stats within the window have msgs/s above the upper bound threshold (hard limit)
- boolean needRP = Iterators.all(kmss.iterator(), new Predicate<KafkaMeterStats>()
- {
- @Override
- public boolean apply(KafkaMeterStats kms)
- {
- // If there are more than 1 kafka partition and the total msg/s reach the limit
- return kms.partitionStats.size() > 1 && kms.totalMsgPerSec > msgRateUpperBound;
- }
- });
-
- // or all the stats within the window have bytes/s above the upper bound threshold (hard limit)
- needRP = needRP || Iterators.all(kmss.iterator(), new Predicate<KafkaMeterStats>()
- {
- @Override
- public boolean apply(KafkaMeterStats kms)
- {
- //If there are more than 1 kafka partition and the total bytes/s reach the limit
- return kms.partitionStats.size() > 1 && kms.totalBytesPerSec > byteRateUpperBound;
- }
- });
-
- return needRP;
-
- }
-
- private final AbstractPartitionableKafkaInputOperator _cloneOperator()
- {
- AbstractPartitionableKafkaInputOperator newOp = cloneOperator();
- newOp.msgRateUpperBound = this.msgRateUpperBound;
- newOp.byteRateUpperBound = this.byteRateUpperBound;
- newOp.strategy = this.strategy;
- newOp.setMaxTuplesPerWindow(getMaxTuplesPerWindow());
- return newOp;
- }
-
- /**
- * Implement this method to initialize new operator instance for new partition.
- * Please carefully include all the properties you want to keep in new instance
- *
- * @return
- */
- protected abstract AbstractPartitionableKafkaInputOperator cloneOperator();
-
- @Override
- public void setup(OperatorContext context)
- {
- super.setup(context);
- this.context = context;
- }
-
- @Override
- public void endWindow()
- {
-
- super.endWindow();
-
- if (strategy == PartitionStrategy.ONE_TO_MANY) {
- //send the stats to AppMaster and let the AppMaster decide if it wants to repartition
- context.setCounters(getConsumer().getConsumerStats());
- }
- }
-
- public static enum PartitionStrategy
- {
- /**
- * Each operator partition connect to only one kafka partition
- */
- ONE_TO_ONE,
- /**
- * Each operator consumes from several kafka partitions with overall input rate under some certain hard limit in msgs/s or bytes/s
- * For now it <b>only</b> support <b>simple kafka consumer</b>
- */
- ONE_TO_MANY,
- /**
- * 1 to N partition based on the heuristic function
- * <b>NOT</b> implemented yet
- * TODO implement this later
- */
- ONE_TO_MANY_HEURISTIC
- }
-
- public void setInitialPartitionCount(int partitionCount)
- {
- this.initialPartitionCount = partitionCount;
- }
-
- public int getInitialPartitionCount()
- {
- return initialPartitionCount;
- }
-
- public long getMsgRateUpperBound()
- {
- return msgRateUpperBound;
- }
-
- public void setMsgRateUpperBound(long msgRateUpperBound)
- {
- this.msgRateUpperBound = msgRateUpperBound;
- }
-
- public long getByteRateUpperBound()
- {
- return byteRateUpperBound;
- }
-
- public void setByteRateUpperBound(long byteRateUpperBound)
- {
- this.byteRateUpperBound = byteRateUpperBound;
- }
-
- public void setInitialOffset(String initialOffset)
- {
- this.consumer.initialOffset = initialOffset;
- }
-
- public void setOffsetManager(OffsetManager offsetManager)
- {
- this.offsetManager = offsetManager;
- }
-
- public void setRepartitionCheckInterval(long repartitionCheckInterval)
- {
- this.repartitionCheckInterval = repartitionCheckInterval;
- }
-
- public long getRepartitionCheckInterval()
- {
- return repartitionCheckInterval;
- }
-
- public void setRepartitionInterval(long repartitionInterval)
- {
- this.repartitionInterval = repartitionInterval;
- }
-
- public long getRepartitionInterval()
- {
- return repartitionInterval;
- }
-
- //@Pattern(regexp="ONE_TO_ONE|ONE_TO_MANY|ONE_TO_MANY_HEURISTIC", flags={Flag.CASE_INSENSITIVE})
- public void setStrategy(String policy)
- {
- this.strategy = PartitionStrategy.valueOf(policy.toUpperCase());
- }
-
- static class PartitionInfo
- {
- Set<Integer> kpids;
- long msgRateLeft;
- long byteRateLeft;
- }
-
-}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractPartitionableKafkaSinglePortInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractPartitionableKafkaSinglePortInputOperator.java
deleted file mode 100644
index cc7914d..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractPartitionableKafkaSinglePortInputOperator.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.kafka;
-
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-import kafka.message.Message;
-
-/**
- * This is the base implementation of Kafka input operator, with a single output port, which consumes data from Kafka message bus.
- * It will be dynamically partitioned based on the upstream Kafka partition.
- * Subclasses should implement the methods which convert Kafka messages to tuples.
- * <p></p>
- * @displayName Abstract Partitionable Kafka Single Port Input
- * @category Messaging
- * @tags input operator
- *
- * @since 0.9.0
- */
-@OperatorAnnotation(partitionable = true)
-public abstract class AbstractPartitionableKafkaSinglePortInputOperator<T> extends AbstractPartitionableKafkaInputOperator
-{
- /**
- * This output port emits tuples extracted from Kafka messages.
- */
- public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<T>();
-
- /**
- * Any concrete class derived from AbstractPartitionableKafkaSinglePortInputOperator has to implement this method
- * so that it knows what type of message it is going to send to Malhar.
- * It converts a ByteBuffer message into a Tuple. A Tuple can be of any type (derived from Java Object) that
- * operator user intends to.
- *
- * @param msg
- */
- public abstract T getTuple(Message msg);
-
- /**
- * Implement abstract method.
- */
- @Override
- public void emitTuple(Message msg)
- {
- outputPort.emit(getTuple(msg));
- }
-}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
index 6e2092f..d84a145 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
@@ -18,166 +18,215 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import javax.validation.constraints.Min;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
+import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
/**
- * High level kafka consumer adapter used for kafka input operator
- * Properties:<br>
+ * High level kafka consumer adapter used for kafka input operator Properties:<br>
* <b>consumerConfig</b>: Used for create the high-level kafka consumer<br>
* <b>numStream</b>: num of threads to consume the topic in parallel <br>
- * <li> (-1): create #partition thread and consume the topic in parallel threads</li>
- * <br>
+ * <li>(-1): create #partition thread and consume the topic in parallel threads</li> <br>
* <br>
*
* Load balance: <br>
- * Build-in kafka load balancing strategy, Consumers with different consumer.id and same group.id will distribute the reads from different partition<br>
- * There are at most #partition per topic could consuming in parallel
- * For more information see {@link http://kafka.apache.org/documentation.html#distributionimpl} <br>
- * <br><br>
+ * Build-in kafka load balancing strategy, Consumers with different consumer.id and same group.id will distribute the
+ * reads from different partition<br>
+ * There are at most #partition per topic could consuming in parallel For more information see {@link http
+ * ://kafka.apache.org/documentation.html#distributionimpl} <br>
+ * <br>
+ * <br>
* Kafka broker failover: <br>
* Build-in failover strategy, the consumer will pickup the next available synchronized broker to consume data <br>
- * For more information see {@link http://kafka.apache.org/documentation.html#distributionimpl} <br>
+ * For more information see {@link http ://kafka.apache.org/documentation.html#distributionimpl} <br>
*
* @since 0.9.0
*/
public class HighlevelKafkaConsumer extends KafkaConsumer
{
private static final Logger logger = LoggerFactory.getLogger(HighlevelKafkaConsumer.class);
-
+
private Properties consumerConfig = null;
- private transient ConsumerConnector standardConsumer = null;
-
- private transient ExecutorService consumerThreadExecutor = null;
-
/**
- * -1 Dynamically create number of stream according to the partitions
- * < #{kafka partition} each stream could receive any message from any partition, order is not guaranteed among the partitions
- * > #{kafka partition} each stream consume message from one partition, some stream might not get any data
+ * Consumer client for topic on each cluster
*/
- @Min(value = -1)
- private int numStream = 1;
-
+ private transient Map<String, ConsumerConnector> standardConsumer = null;
+
+ private transient ExecutorService consumerThreadExecutor = null;
+
+ /**
+ * number of stream for topic on each cluster null/empty: create same # streams to # partitions of the topic on each
+ * cluster
+ */
+ private Map<String, Integer> numStream;
+
public HighlevelKafkaConsumer()
{
+ numStream = new HashMap<String, Integer>();
}
-
+
public HighlevelKafkaConsumer(Properties consumerConfig)
{
super();
this.consumerConfig = consumerConfig;
}
-
+
@Override
public void create()
{
super.create();
- // This is important to let kafka know how to distribute the reads among different consumers in same consumer group
- // Don't reuse any id for recovery to avoid rebalancing error because there is some delay for zookeeper to
- // find out the old consumer is dead and delete the entry even new consumer is back online
+ if (standardConsumer == null) {
+ standardConsumer = new HashMap<String, ConsumerConnector>();
+ }
+
+ // This is important to let kafka know how to distribute the reads among
+ // different consumers in same consumer group
+ // Don't reuse any id for recovery to avoid rebalancing error because
+ // there is some delay for zookeeper to
+ // find out the old consumer is dead and delete the entry even new
+ // consumer is back online
consumerConfig.put("consumer.id", "consumer" + System.currentTimeMillis());
- if(initialOffset.equalsIgnoreCase("earliest")){
+ if (initialOffset.equalsIgnoreCase("earliest")) {
consumerConfig.put("auto.offset.reset", "smallest");
} else {
consumerConfig.put("auto.offset.reset", "largest");
}
- standardConsumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerConfig));
+
}
@Override
public void start()
{
super.start();
+ //Share other properties among all connectors but set zookeepers respectively cause different cluster would use different zookeepers
+ for (String cluster : zookeeper.keySet()) {
+ // create high level consumer for every cluster
+ Properties config = new Properties();
+ config.putAll(consumerConfig);
+ config.setProperty("zookeeper.connect", Joiner.on(',').join(zookeeper.get(cluster)));
+ // create consumer connector will start a daemon thread to monitor the metadata change
+ // we want to start this thread until the operator is activated
+ standardConsumer.put(cluster, kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(config)));
+ }
+
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
- int realNumStream = numStream;
- if (numStream == -1) {
- realNumStream = KafkaMetadataUtil.getPartitionsForTopic(getBrokerSet(), getTopic()).size();
- }
- topicCountMap.put(topic, new Integer(realNumStream));
- Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = standardConsumer.createMessageStreams(topicCountMap);
- // start $numStream anonymous threads to consume the data
- consumerThreadExecutor = Executors.newFixedThreadPool(realNumStream);
- for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(topic)) {
- consumerThreadExecutor.submit(new Runnable() {
- public void run()
- {
- ConsumerIterator<byte[], byte[]> itr = stream.iterator();
- logger.debug("Thread " + Thread.currentThread().getName() + " start consuming message...");
- while (itr.hasNext() && isAlive) {
- MessageAndMetadata<byte[], byte[]> mam = itr.next();
- try {
- putMessage(mam.partition(), new Message(mam.message()));
- } catch (InterruptedException e) {
- logger.error("Message Enqueue has been interrupted", e);
- }
- }
- logger.debug("Thread " + Thread.currentThread().getName() + " stop consuming message...");
- }
- });
+ if (numStream == null || numStream.size() == 0) {
+ if (numStream == null) {
+ numStream = new HashMap<String, Integer>();
+ }
+ // get metadata from kafka and initialize streams accordingly
+ for (Entry<String, List<PartitionMetadata>> e : KafkaMetadataUtil.getPartitionsForTopic(brokers, topic).entrySet()) {
+ numStream.put(e.getKey(), e.getValue().size());
+ }
}
+
+ int totalNumStream = 0;
+ for (int delta : numStream.values()) {
+ totalNumStream += delta;
+ }
+ // start $totalNumStream anonymous threads to consume the data from all clusters
+ if (totalNumStream <= 0) {
+ logger.warn("No more job needed to consume data ");
+ return;
+ }
+ consumerThreadExecutor = Executors.newFixedThreadPool(totalNumStream);
+
+ for (final Entry<String, Integer> e : numStream.entrySet()) {
+
+ int realNumStream = e.getValue();
+ topicCountMap.put(topic, new Integer(realNumStream));
+ Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = standardConsumer.get(e.getKey()).createMessageStreams(topicCountMap);
+
+ for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(topic)) {
+ consumerThreadExecutor.submit(new Runnable() {
+
+ KafkaPartition kp = new KafkaPartition(e.getKey(), topic, -1);
+
+ public void run()
+ {
+ ConsumerIterator<byte[], byte[]> itr = stream.iterator();
+ logger.debug("Thread {} starts consuming message...", Thread.currentThread().getName());
+ while (itr.hasNext() && isAlive) {
+ MessageAndMetadata<byte[], byte[]> mam = itr.next();
+ try {
+ kp.setPartitionId(mam.partition());
+ putMessage(kp, new Message(mam.message()), mam.offset());
+ } catch (InterruptedException e) {
+ logger.error("Message Enqueue has been interrupted", e);
+ }
+ }
+ logger.debug("Thread {} stops consuming message...", Thread.currentThread().getName());
+ }
+ });
+ }
+
+ }
+
}
@Override
public void close()
{
- if(standardConsumer!=null)
- standardConsumer.shutdown();
- if(consumerThreadExecutor!=null){
+ if (standardConsumer != null && standardConsumer.values() != null) {
+ for (ConsumerConnector consumerConnector : standardConsumer.values()) {
+ consumerConnector.shutdown();
+ }
+ }
+ if (consumerThreadExecutor != null) {
consumerThreadExecutor.shutdown();
}
}
-
public void setConsumerConfig(Properties consumerConfig)
{
this.consumerConfig = consumerConfig;
}
@Override
- protected KafkaConsumer cloneConsumer(Set<Integer> partitionIds)
+ protected void resetPartitionsAndOffset(Set<KafkaPartition> partitionIds, Map<KafkaPartition, Long> startOffset)
{
- return cloneConsumer(partitionIds, null);
- }
-
- @Override
- protected KafkaConsumer cloneConsumer(Set<Integer> partitionIds, Map<Integer, Long> startOffset)
- {
- Properties newProp = new Properties();
- // Copy most properties from the template consumer. For example the "group.id" should be set to same value
- newProp.putAll(consumerConfig);
- HighlevelKafkaConsumer newConsumer = new HighlevelKafkaConsumer(newProp);
- newConsumer.setBrokerSet(this.brokerSet);
- newConsumer.setTopic(this.topic);
- newConsumer.numStream = partitionIds.size();
- newConsumer.initialOffset = initialOffset;
- return newConsumer;
+ this.numStream = new HashMap<String, Integer>();
+ for (KafkaPartition kafkaPartition : partitionIds) {
+ if (this.numStream.get(kafkaPartition.getClusterId()) == null) {
+ this.numStream.put(kafkaPartition.getClusterId(), 0);
+ }
+ this.numStream.put(kafkaPartition.getClusterId(), this.numStream.get(kafkaPartition.getClusterId()) + 1);
+ }
}
@Override
protected void commitOffset()
{
- // commit the offsets at checkpoint so that high-level consumer don't have to receive too many duplicate messages
- standardConsumer.commitOffsets();
+ // commit the offsets at checkpoint so that high-level consumer don't
+ // have to receive too many duplicate messages
+ if (standardConsumer != null && standardConsumer.values() != null) {
+ for (ConsumerConnector consumerConnector : standardConsumer.values()) {
+ consumerConnector.commitOffsets();
+ }
+ }
}
@Override
- protected Map<Integer, Long> getCurrentOffsets()
+ protected Map<KafkaPartition, Long> getCurrentOffsets()
{
// offset is not useful for high-level kafka consumer
- // TODO
throw new UnsupportedOperationException("Offset request is currently not supported for high-level consumer");
}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java
index bf4e672..21615ef 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java
@@ -28,6 +28,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+
import kafka.message.Message;
import javax.validation.constraints.NotNull;
@@ -38,7 +39,11 @@
import org.apache.commons.lang3.StringUtils;
import com.datatorrent.api.Context;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.collect.HashMultimap;
import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
/**
* Base Kafka Consumer class used by kafka input operator
@@ -62,17 +67,17 @@
this.topic = topic;
}
- public KafkaConsumer(Set<String> brokerSet, String topic)
+ public KafkaConsumer(SetMultimap<String, String> zks, String topic)
{
this.topic = topic;
- this.brokerSet = brokerSet;
+ this.zookeeper = zks;
}
private int cacheSize = 1024;
protected transient boolean isAlive = false;
- private transient ArrayBlockingQueue<Message> holdingBuffer;
+ private transient ArrayBlockingQueue<KafkaMessage> holdingBuffer;
/**
* The topic that this consumer consumes
@@ -81,12 +86,14 @@
protected String topic = "default_topic";
/**
- * A broker list to retrieve the metadata for the consumer
- * This property could be null
- * But it's mandatory for dynamic partition and fail-over
+ * A zookeeper map keyed by cluster id
+ * It's mandatory field
*/
@NotNull
- protected Set<String> brokerSet;
+ @Bind(JavaSerializer.class)
+ protected SetMultimap<String, String> zookeeper;
+
+ protected transient SetMultimap<String, String> brokers;
/**
@@ -108,16 +115,32 @@
* This method is called in setup method of the operator
*/
public void create(){
- holdingBuffer = new ArrayBlockingQueue<Message>(cacheSize);
- };
+ initBrokers();
+ holdingBuffer = new ArrayBlockingQueue<KafkaMessage>(cacheSize);
+ }
+
+ public void initBrokers()
+ {
+ if(brokers!=null){
+ return ;
+ }
+ if(zookeeper!=null){
+ brokers = HashMultimap.create();
+ for (String clusterId: zookeeper.keySet()) {
+ brokers.putAll(clusterId, KafkaMetadataUtil.getBrokers(zookeeper.get(clusterId)));
+ }
+ }
+
+ }
/**
* This method is called in the activate method of the operator
*/
- public void start(){
+ public void start()
+ {
isAlive = true;
statsSnapShot.start();
- };
+ }
/**
* The method is called in the deactivate method of the operator
@@ -157,7 +180,7 @@
return topic;
}
- public Message pollMessage()
+ public KafkaMessage pollMessage()
{
return holdingBuffer.poll();
}
@@ -167,14 +190,14 @@
return holdingBuffer.size();
}
- public void setBrokerSet(Set<String> brokerSet)
+ public void setZookeeper(SetMultimap<String, String> zks)
{
- this.brokerSet = brokerSet;
+ this.zookeeper = zks;
}
- public Set<String> getBrokerSet()
+ public SetMultimap<String, String> getZookeeper()
{
- return brokerSet;
+ return zookeeper;
}
public void setInitialOffset(String initialOffset)
@@ -198,20 +221,15 @@
}
- final protected void putMessage(int partition, Message msg) throws InterruptedException{
+ final protected void putMessage(KafkaPartition partition, Message msg, long offset) throws InterruptedException{
// block from receiving more message
- holdingBuffer.put(msg);
+ holdingBuffer.put(new KafkaMessage(partition, msg, offset));
statsSnapShot.mark(partition, msg.payloadSize());
- };
-
-
- protected abstract KafkaConsumer cloneConsumer(Set<Integer> partitionIds);
-
- protected abstract KafkaConsumer cloneConsumer(Set<Integer> partitionIds, Map<Integer, Long> startOffset);
+ }
protected abstract void commitOffset();
- protected abstract Map<Integer, Long> getCurrentOffsets();
+ protected abstract Map<KafkaPartition, Long> getCurrentOffsets();
public KafkaMeterStats getConsumerStats()
{
@@ -219,6 +237,7 @@
return stats;
}
+ protected abstract void resetPartitionsAndOffset(Set<KafkaPartition> partitionIds, Map<KafkaPartition, Long> startOffset);
/**
* Counter class which gives the statistic value from the consumer
*/
@@ -230,7 +249,7 @@
/**
* A compact partition counter. The data collected for each partition is 4bytes brokerId + 1byte connected + 8bytes msg/s + 8bytes bytes/s + 8bytes offset
*/
- public ConcurrentHashMap<Integer, PartitionStats> partitionStats = new ConcurrentHashMap<Integer, PartitionStats>();
+ public ConcurrentHashMap<KafkaPartition, PartitionStats> partitionStats = new ConcurrentHashMap<KafkaPartition, PartitionStats>();
/**
@@ -246,9 +265,9 @@
}
- public void set_1minMovingAvgPerPartition(int pid, long[] _1minAvgPar)
+ public void set_1minMovingAvgPerPartition(KafkaPartition kp, long[] _1minAvgPar)
{
- PartitionStats ps = putPartitionStatsIfNotPresent(pid);
+ PartitionStats ps = putPartitionStatsIfNotPresent(kp);
ps.msgsPerSec = _1minAvgPar[0];
ps.bytesPerSec = _1minAvgPar[1];
}
@@ -259,8 +278,8 @@
totalBytesPerSec = _1minAvg[1];
}
- public void updateOffsets(Map<Integer, Long> offsets){
- for (Entry<Integer, Long> os : offsets.entrySet()) {
+ public void updateOffsets(Map<KafkaPartition, Long> offsets){
+ for (Entry<KafkaPartition, Long> os : offsets.entrySet()) {
PartitionStats ps = putPartitionStatsIfNotPresent(os.getKey());
ps.offset = os.getValue();
}
@@ -277,40 +296,55 @@
return r;
}
- public void updatePartitionStats(int partitionId,int brokerId, String host)
+ public void updatePartitionStats(KafkaPartition kp,int brokerId, String host)
{
- PartitionStats ps = putPartitionStatsIfNotPresent(partitionId);
+ PartitionStats ps = putPartitionStatsIfNotPresent(kp);
ps.brokerHost = host;
ps.brokerId = brokerId;
}
+
+ private synchronized PartitionStats putPartitionStatsIfNotPresent(KafkaPartition kp){
+ PartitionStats ps = partitionStats.get(kp);
- private synchronized PartitionStats putPartitionStatsIfNotPresent(int pid){
- PartitionStats ps = partitionStats.get(pid);
if (ps == null) {
ps = new PartitionStats();
- partitionStats.put(pid, ps);
+ partitionStats.put(kp, ps);
}
return ps;
}
}
+ public static class KafkaMessage
+ {
+ KafkaPartition kafkaPart;
+ Message msg;
+ long offSet;
+ public KafkaMessage(KafkaPartition kafkaPart, Message msg, long offset)
+ {
+ this.kafkaPart = kafkaPart;
+ this.msg = msg;
+ this.offSet = offset;
+ }
+
+ }
+
public static class KafkaMeterStatsUtil {
- public static Map<Integer, Long> getOffsetsForPartitions(List<KafkaMeterStats> kafkaMeterStats)
+ public static Map<KafkaPartition, Long> getOffsetsForPartitions(List<KafkaMeterStats> kafkaMeterStats)
{
- Map<Integer, Long> result = Maps.newHashMap();
+ Map<KafkaPartition, Long> result = Maps.newHashMap();
for (KafkaMeterStats kms : kafkaMeterStats) {
- for (Entry<Integer, PartitionStats> item : kms.partitionStats.entrySet()) {
+ for (Entry<KafkaPartition, PartitionStats> item : kms.partitionStats.entrySet()) {
result.put(item.getKey(), item.getValue().offset);
}
}
return result;
}
- public static Map<Integer, long[]> get_1minMovingAvgParMap(KafkaMeterStats kafkaMeterStats)
+ public static Map<KafkaPartition, long[]> get_1minMovingAvgParMap(KafkaMeterStats kafkaMeterStats)
{
- Map<Integer, long[]> result = Maps.newHashMap();
- for (Entry<Integer, PartitionStats> item : kafkaMeterStats.partitionStats.entrySet()) {
+ Map<KafkaPartition, long[]> result = Maps.newHashMap();
+ for (Entry<KafkaPartition, PartitionStats> item : kafkaMeterStats.partitionStats.entrySet()) {
result.put(item.getKey(), new long[]{item.getValue().msgsPerSec, item.getValue().bytesPerSec});
}
return result;
@@ -374,12 +408,12 @@
/**
* 1 min total msg number for each partition
*/
- private final Map<Integer, long[]> _1_min_msg_sum_par = new HashMap<Integer, long[]>();
+ private final Map<KafkaPartition, long[]> _1_min_msg_sum_par = new HashMap<KafkaPartition, long[]>();
/**
* 1 min total byte number for each partition
*/
- private final Map<Integer, long[]> _1_min_byte_sum_par = new HashMap<Integer, long[]>();
+ private final Map<KafkaPartition, long[]> _1_min_byte_sum_par = new HashMap<KafkaPartition, long[]>();
private static int cursor = 0;
@@ -404,7 +438,7 @@
bytesSec[60] -= bytesSec[cursor];
msgSec[cursor] = 0;
bytesSec[cursor] = 0;
- for (Entry<Integer, long[]> item : _1_min_msg_sum_par.entrySet()) {
+ for (Entry<KafkaPartition, long[]> item : _1_min_msg_sum_par.entrySet()) {
long[] msgv = item.getValue();
long[] bytesv = _1_min_byte_sum_par.get(item.getKey());
msgv[60] -= msgv[cursor];
@@ -437,7 +471,7 @@
}
}
- public synchronized void mark(int partition, long bytes){
+ public synchronized void mark(KafkaPartition partition, long bytes){
msgSec[cursor]++;
msgSec[60]++;
bytesSec[cursor] += bytes;
@@ -454,11 +488,11 @@
msgv[60]++;
bytev[cursor] += bytes;
bytev[60] += bytes;
- };
+ }
public synchronized void setupStats(KafkaMeterStats stat){
long[] _1minAvg = {msgSec[60]/last, bytesSec[60]/last};
- for (Entry<Integer, long[]> item : _1_min_msg_sum_par.entrySet()) {
+ for (Entry<KafkaPartition, long[]> item : _1_min_msg_sum_par.entrySet()) {
long[] msgv =item.getValue();
long[] bytev = _1_min_byte_sum_par.get(item.getKey());
long[] _1minAvgPar = {msgv[60]/last, bytev[60]/last};
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
index 35b6f2a..960ad3b 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
@@ -16,14 +16,27 @@
package com.datatorrent.contrib.kafka;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import scala.collection.JavaConversions;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Maps.EntryTransformer;
+import com.google.common.collect.SetMultimap;
+
import kafka.api.PartitionOffsetRequestInfo;
+import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
@@ -31,6 +44,8 @@
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
/**
* A util class used to retrieve all the metadatas for partitions/topics
@@ -49,7 +64,7 @@
private static Logger logger = LoggerFactory.getLogger(KafkaMetadataUtil.class);
// A temporary client used to retrieve the metadata of topic/partition etc
- private static final String mdClientId = "Kafka_Broker_Lookup_Client";
+ private static final String mdClientId = "Kafka_Metadata_Lookup_Client";
private static final int timeout=10000;
@@ -57,7 +72,7 @@
private static final int bufferSize = 128 * 1024;
/**
- * @param brokerList
+ * @param brokerList brokers in same cluster
* @param topic
* @return Get the partition metadata list for the specific topic via the brokerList <br>
* null if topic is not found
@@ -70,6 +85,34 @@
}
return tmd.partitionsMetadata();
}
+
+ /**
+ * @param brokers in multiple clusters, keyed by cluster id
+ * @param topic
+ * @return Get the partition metadata list for the specific topic via the brokers
+ * null if topic is not found
+ */
+ public static Map<String, List<PartitionMetadata>> getPartitionsForTopic(SetMultimap<String, String> brokers, final String topic)
+ {
+ return Maps.transformEntries(brokers.asMap(), new EntryTransformer<String, Collection<String>, List<PartitionMetadata>>(){
+ @Override
+ public List<PartitionMetadata> transformEntry(String key, Collection<String> bs)
+ {
+ return getPartitionsForTopic(new HashSet<String>(bs), topic);
+ }});
+ }
+
+
+ public static Set<String> getBrokers(Set<String> zkHost){
+
+ ZkClient zkclient = new ZkClient(StringUtils.join(zkHost, ',') ,30000, 30000, ZKStringSerializer$.MODULE$);
+ Set<String> brokerHosts = new HashSet<String>();
+ for (Broker b : JavaConversions.asJavaIterable(ZkUtils.getAllBrokersInCluster(zkclient))) {
+ brokerHosts.add(b.getConnectionString());
+ }
+ zkclient.close();
+ return brokerHosts;
+ }
/**
@@ -125,7 +168,7 @@
} catch (NumberFormatException e) {
throw new IllegalArgumentException("Wrong format for broker url, should be \"broker1:port1\"");
} catch (Exception e) {
- logger.error("Highly possible some broker(s) for topic {} are dead", topic, e);
+ logger.warn("Broker {} is unavailable or in bad state!", broker);
// skip and try next broker
}
}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaPartition.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaPartition.java
new file mode 100644
index 0000000..3a5652b
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaPartition.java
@@ -0,0 +1,113 @@
+package com.datatorrent.contrib.kafka;
+
+import java.io.Serializable;
+
+public class KafkaPartition implements Serializable
+{
+ protected static final String DEFAULT_CLUSTERID = "com.datatorrent.contrib.kafka.defaultcluster";
+
+ @SuppressWarnings("unused")
+ private KafkaPartition()
+ {
+ }
+
+ public KafkaPartition(String topic, int partitionId)
+ {
+ this(DEFAULT_CLUSTERID, topic, partitionId);
+ }
+
+ public KafkaPartition(String clusterId, String topic, int partitionId)
+ {
+ super();
+ this.clusterId = clusterId;
+ this.partitionId = partitionId;
+ this.topic = topic;
+ }
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = 7556802229202221546L;
+
+
+ private String clusterId;
+
+ private int partitionId;
+
+ private String topic;
+
+ public String getClusterId()
+ {
+ return clusterId;
+ }
+
+ public void setClusterId(String clusterId)
+ {
+ this.clusterId = clusterId;
+ }
+
+ public int getPartitionId()
+ {
+ return partitionId;
+ }
+
+ public void setPartitionId(int partitionId)
+ {
+ this.partitionId = partitionId;
+ }
+
+ public String getTopic()
+ {
+ return topic;
+ }
+
+ public void setTopic(String topic)
+ {
+ this.topic = topic;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((clusterId == null) ? 0 : clusterId.hashCode());
+ result = prime * result + partitionId;
+ result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ return true;
+ if (obj == null)
+ return false;
+ if (getClass() != obj.getClass())
+ return false;
+ KafkaPartition other = (KafkaPartition) obj;
+ if (clusterId == null) {
+ if (other.clusterId != null)
+ return false;
+ } else if (!clusterId.equals(other.clusterId))
+ return false;
+ if (partitionId != other.partitionId)
+ return false;
+ if (topic == null) {
+ if (other.topic != null)
+ return false;
+ } else if (!topic.equals(other.topic))
+ return false;
+ return true;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "KafkaPartition [clusterId=" + clusterId + ", partitionId=" + partitionId + ", topic=" + topic + "]";
+ }
+
+
+
+}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java
index d1ce932..f0e9ba4 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java
@@ -18,7 +18,7 @@
import java.util.Map;
/**
- * An offset manager interface used by {@link AbstractPartitionableKafkaInputOperator} to define the customized initial offsets and periodically update the current offsets of all the operators
+ * An offset manager interface used by {@link AbstractKafkaInputOperator} to define the customized initial offsets and periodically update the current offsets of all the operators
* <br>
* <br>
* Ex. you could write offset to hdfs and load it back when restart the application
@@ -35,16 +35,16 @@
* <br>
* The method is called at the first attempt of creating partitions and the return value is used as initial offset for simple consumer
*
- * @return Map of Kafka partition id as key and offset as value
+ * @return Map of Kafka KafkaPartition as key and long offset as value
*/
- public Map<Integer, Long> loadInitialOffsets();
+ public Map<KafkaPartition, Long> loadInitialOffsets();
/**
* @param offsetsOfPartitions offsets for specified partitions, it is reported by individual operator instances
* <br>
- * The method is called every {@link AbstractPartitionableKafkaInputOperator#getRepartitionCheckInterval()} to update the current offset
+ * The method is called every {@link AbstractKafkaInputOperator#getRepartitionCheckInterval()} to update the current offset
*/
- public void updateOffsets(Map<Integer, Long> offsetsOfPartitions);
+ public void updateOffsets(Map<KafkaPartition, Long> offsetsOfPartitions);
}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/PartitionableKafkaSinglePortStringInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/PartitionableKafkaSinglePortStringInputOperator.java
deleted file mode 100644
index cb7c98a..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/PartitionableKafkaSinglePortStringInputOperator.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.kafka;
-
-import com.datatorrent.api.annotation.OperatorAnnotation;
-import kafka.message.Message;
-
-import java.nio.ByteBuffer;
-
-/**
- * Kafka input adapter operator with a single output port, which consumes String data from the Kafka message bus.
- * <p></p>
- *
- * @displayName Partitionable Kafka Single Port String Input
- * @category Messaging
- * @tags input operator, string
- *
- * @since 0.9.0
- */
-@OperatorAnnotation(partitionable = true)
-public class PartitionableKafkaSinglePortStringInputOperator extends AbstractPartitionableKafkaSinglePortInputOperator<String>
-{
- /**
- * Implement abstract method of AbstractPartitionableKafkaSinglePortInputOperator
- * Just parse the kafka message as a string
- */
- @Override
- public String getTuple(Message message)
- {
- String data = "";
- try {
- ByteBuffer buffer = message.payload();
- byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- data = new String(bytes);
-// System.out.println(data);
-// logger.debug("Consuming {}", data);
- }
- catch (Exception ex) {
- return data;
- }
- return data;
- }
-
- @Override
- protected AbstractPartitionableKafkaInputOperator cloneOperator()
- {
- return new PartitionableKafkaSinglePortStringInputOperator();
- }
-
-
-}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
index 5186486..3ad9f37 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
@@ -15,22 +15,31 @@
*/
package com.datatorrent.contrib.kafka;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
+
import javax.validation.constraints.NotNull;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.OffsetRequest;
@@ -42,37 +51,164 @@
import kafka.message.MessageAndOffset;
/**
- * Simple kafka consumer adaptor used by kafka input operator
- * Properties:<br>
+ * Simple kafka consumer adaptor used by kafka input operator Properties:<br>
* <b>timeout</b>: Timeout for connection and ping <br>
* <b>bufferSize</b>: buffer size of the consumer <br>
* <b>clientId</b>: client id of the consumer <br>
* <b>partitionIds</b>: partition id that the consumer want to consume <br>
- * <li> (-1): create #partition threads and consumers to read the topic from different partitions in parallel</li>
- * <br>
+ * <li>(-1): create #partition threads and consumers to read the topic from different partitions in parallel</li> <br>
* <b>metadataRefreshInterval</b>: The interval that the monitor thread use to monitor the broker leadership change <br>
* <b>metadataRetrievalRetry</b>: Maximum retry times for metadata retrieval failures<br>
* default value is 3 <br>
- * -1: always retry <br>
+ * -1: unlimited retry <br>
* <br>
*
* Load balance: <br>
- * <li>Every consumer only connects to leader broker for particular partition once it's created</li>
- * <li>Once leadership change detected(leader broker failure, or server-side reassignment), it switches to the new leader broker</li>
- * <li>For server-side leadership change, see kafka-preferred-replica-election.sh and kafka-reassign-partitions.sh</li>
- * <li>There is ONE separate thread to monitor the leadership for all the partitions of the topic at every #metadataRefreshInterval milliseconds</li>
- * <br>
+ * <li>The consumer create several data-consuming threads to consume the data from broker(s)</li>
+ * <li>Each thread has only ONE kafka client connecting to ONE broker to consume data from for multiple partitions </li>
+ * <li>
+ * There is ONE separate thread to monitor the leadership for all the partitions of the topic at every
+ * #metadataRefreshInterval milliseconds</li>
+ * <li>Once leadership
+ * change detected(leader broker failure, or server-side reassignment), it switches to the new leader broker</li> <li>
+ * For server-side leadership change, see kafka-preferred-replica-election.sh and kafka-reassign-partitions.sh</li> <br>
* <br>
* Kafka broker failover: <br>
- * <li>Once broker failure is detected, it waits #metadataRefreshInterval to reconnect to the new leader broker </li>
- * <li>If there are consecutive #metadataRetrievalRetry failures to retrieve the metadata for the topic. It will stop consuming the partition</li>
- * <br>
+ * <li>Once broker failure is detected, it waits #metadataRefreshInterval to reconnect to the new leader broker</li> <li>
+ * If there are consecutive #metadataRetrievalRetry failures to retrieve the metadata for the topic. It will stop
+ * consuming the partition</li> <br>
*
* @since 0.9.0
*/
public class SimpleKafkaConsumer extends KafkaConsumer
{
+ /**
+ * The data-consuming thread that use one simple kafka client to connect to one broker which is the leader of the partition(s) that this consumer is interested
+ */
+ static final class ConsumerThread implements Runnable
+ {
+ private final Broker broker;
+ private final String clientName;
+ // kafka simple consumer object
+ private SimpleConsumer ksc;
+ // The SimpleKafkaConsumer which holds this thread
+ private SimpleKafkaConsumer consumer;
+ // partitions consumed in this thread
+ private final Set<KafkaPartition> kpS;
+ @SuppressWarnings("rawtypes")
+ private Future threadItSelf;
+
+ private ConsumerThread(Broker broker, Set<KafkaPartition> kpl, SimpleKafkaConsumer consumer)
+ {
+ this.broker = broker;
+ this.clientName = consumer.getClientName(broker.host() + "_" + broker.port());
+ this.consumer = consumer;
+ this.kpS = Collections.newSetFromMap(new ConcurrentHashMap<KafkaPartition, Boolean>());
+ this.kpS.addAll(kpl);
+ }
+
+ @Override
+ public void run()
+ {
+
+ try {
+ logger.info("Connecting to broker {} [ timeout:{}, buffersize:{}, clientId: {}]", broker, consumer.timeout, consumer.bufferSize, clientName);
+ ksc = new SimpleConsumer(broker.host(), broker.port(), consumer.timeout, consumer.bufferSize, clientName);
+ // Initialize all start offsets for all kafka partitions read from this consumer
+ // read either from beginning of the broker or last offset committed by the operator
+ for (KafkaPartition kpForConsumer : kpS) {
+ logger.info("Start consuming data of topic {} ", kpForConsumer);
+ if (consumer.offsetTrack.get(kpForConsumer) != null) {
+ // start from recovery
+ // offsets.put(kpForConsumer, offsetTrack.get(kpForConsumer));
+ logger.info("Partition {} initial offset {}", kpForConsumer, consumer.offsetTrack.get(kpForConsumer));
+ } else {
+ long startOffsetReq = consumer.initialOffset.equalsIgnoreCase("earliest") ? OffsetRequest.EarliestTime() : OffsetRequest.LatestTime();
+ logger.info("Partition {} initial offset {} {}", kpForConsumer.getPartitionId(), startOffsetReq, consumer.initialOffset);
+ consumer.offsetTrack.put(kpForConsumer, KafkaMetadataUtil.getLastOffset(ksc, consumer.topic, kpForConsumer.getPartitionId(), startOffsetReq, clientName));
+ }
+ }
+
+ // stop consuming only when the consumer container is stopped or the metadata can not be refreshed
+ while (consumer.isAlive && (consumer.metadataRefreshRetryLimit == -1 || consumer.retryCounter.get() < consumer.metadataRefreshRetryLimit)) {
+
+ if (kpS == null || kpS.isEmpty()) {
+ return;
+ }
+
+ FetchRequestBuilder frb = new FetchRequestBuilder().clientId(clientName);
+ // add all partition request in one Fretch request together
+ for (KafkaPartition kpForConsumer : kpS) {
+ frb.addFetch(consumer.topic, kpForConsumer.getPartitionId(), consumer.offsetTrack.get(kpForConsumer), consumer.bufferSize);
+ }
+
+ FetchRequest req = frb.build();
+ if (ksc == null) {
+ if (consumer.metadataRefreshInterval > 0) {
+ Thread.sleep(consumer.metadataRefreshInterval + 1000);
+ } else {
+ Thread.sleep(100);
+ }
+ }
+ FetchResponse fetchResponse = ksc.fetch(req);
+ for (Iterator<KafkaPartition> iterator = kpS.iterator(); iterator.hasNext();) {
+ KafkaPartition kafkaPartition = iterator.next();
+ if (fetchResponse.hasError() && fetchResponse.errorCode(consumer.topic, kafkaPartition.getPartitionId()) != ErrorMapping.NoError()) {
+ // Kick off partition(s) which has error when fetch from this broker temporarily
+ // Monitor will find out which broker it goes in monitor thread
+ logger.warn("Error when consuming topic {} from broker {} with error code {} ", kafkaPartition, broker, fetchResponse.errorCode(consumer.topic, kafkaPartition.getPartitionId()));
+ iterator.remove();
+ consumer.partitionToBroker.remove(kafkaPartition);
+ consumer.stats.updatePartitionStats(kafkaPartition, -1, "");
+ continue;
+ }
+ // If the fetchResponse either has no error or the no error for $kafkaPartition get the data
+ long offset = -1l;
+ for (MessageAndOffset msg : fetchResponse.messageSet(consumer.topic, kafkaPartition.getPartitionId())) {
+ offset = msg.nextOffset();
+ consumer.putMessage(kafkaPartition, msg.message(), msg.offset());
+ }
+ if (offset != -1) {
+ consumer.offsetTrack.put(kafkaPartition, offset);
+ }
+
+ }
+ }
+ } catch (Exception e){
+ logger.error("The consumer encounters an unrecoverable exception. Close the connection to broker {} \n Caused by {}", broker, e);
+ } finally {
+ if (ksc != null) {
+ ksc.close();
+ }
+ for (KafkaPartition kpForConsumer : kpS) {
+ // Update consumer that these partitions are currently stop being consumed because of some unrecoverable exception
+ consumer.partitionToBroker.remove(kpForConsumer);
+ }
+
+ logger.info("Exit the consumer thread for broker {} ", broker);
+ }
+ }
+
+ public void addPartitions(Set<KafkaPartition> newKps)
+ {
+ // Add the partition(s) to this existing consumer thread they are assigned to this broker
+ kpS.addAll(newKps);
+
+ }
+
+ @SuppressWarnings("rawtypes")
+ public Future getThreadItSelf()
+ {
+ return threadItSelf;
+ }
+
+ public void setThreadItSelf(@SuppressWarnings("rawtypes") Future threadItSelf)
+ {
+ this.threadItSelf = threadItSelf;
+ }
+ }
+
public SimpleKafkaConsumer()
{
super();
@@ -83,30 +219,32 @@
this(topic, timeout, bufferSize, clientId, null);
}
- public SimpleKafkaConsumer(String topic, int timeout, int bufferSize, String clientId, Set<Integer> partitionIds)
+ public SimpleKafkaConsumer(String topic, int timeout, int bufferSize, String clientId, Set<KafkaPartition> partitionIds)
{
super(topic);
this.timeout = timeout;
this.bufferSize = bufferSize;
this.clientId = clientId;
- this.partitionIds = partitionIds;
+ this.kps = partitionIds;
}
- public SimpleKafkaConsumer(Set<String> brokerList, String topic, int timeout, int bufferSize, String clientId, Set<Integer> partitionIds)
+ public SimpleKafkaConsumer(SetMultimap<String, String> zks, String topic, int timeout, int bufferSize, String clientId, Set<KafkaPartition> partitionIds)
{
- super(brokerList, topic);
+ super(zks, topic);
this.timeout = timeout;
this.bufferSize = bufferSize;
this.clientId = clientId;
- this.partitionIds = partitionIds;
+ this.kps = partitionIds;
}
private static final Logger logger = LoggerFactory.getLogger(SimpleKafkaConsumer.class);
/**
- * Track thread for each partition, clean the resource if necessary
+ * Track consumers connected to each broker, topics and partitions hosted on same broker are consumed by same
+ * consumer. Clean the resource if necessary. Key is the kafka broker object.
*/
- private final transient HashMap<Integer, AtomicReference<SimpleConsumer>> simpleConsumerThreads = new HashMap<Integer, AtomicReference<SimpleConsumer>>();
+ private final transient Map<Broker, ConsumerThread> simpleConsumerThreads = new HashMap<Broker, ConsumerThread>();
+
private transient ExecutorService kafkaConsumerExecutor;
@@ -117,7 +255,6 @@
*/
private final transient AtomicInteger retryCounter = new AtomicInteger(0);
-
private int timeout = 10000;
/**
@@ -132,49 +269,54 @@
private String clientId = "Kafka_Simple_Client";
/**
- * interval in between reconnect if one kafka broker goes down in milliseconds
- * if metadataRefreshInterval < 0 it will never refresh the metadata
- * WARN: Turning off the refresh will disable failover to new broker
+ * Interval in between refresh the metadata change(broker change) in milliseconds. Metadata refresh guarantees to
+ * automatically reconnect to new broker that are new elected as broker host Disable metadata refresh by setting this
+ * to -1 WARN: Turning off the refresh will disable auto reconnect to new broker
*/
private int metadataRefreshInterval = 30000;
-
/**
- * Maximum brokers' metadata refresh retry limit
- * -1 means unlimited retry
+ * Maximum brokers' metadata refresh retry limit. -1 means unlimited retry
*/
private int metadataRefreshRetryLimit = -1;
/**
- * You can setup your particular partitionID you want to consume with *simple
- * kafka consumer*. Use this to maximize the distributed performance.
- * By default it's -1 which means #partitionSize anonymous threads will be
- * created to consume tuples from different partition
+ * You can setup your particular kafka partitions you want to consume for this consumer client. This can be used to
+ * share client and thread and maximize the overall performance. Null or empty value: consumer will create #
+ * threads&clients same as # brokers that host the all partitions of the topic Each thread consumes 1(+) partitions
+ * from 1 broker
*/
- private Set<Integer> partitionIds = new HashSet<Integer>();
+ private Set<KafkaPartition> kps = new HashSet<KafkaPartition>();
-
+ // This map maintains mapping between kafka partition and it's leader broker in realtime monitored by a thread
+ private transient final ConcurrentHashMap<KafkaPartition, Broker> partitionToBroker = new ConcurrentHashMap<KafkaPartition, Broker>();
+
/**
- * Track offset for each partition, so operator could start from the last serialized state
- * Use ConcurrentHashMap to avoid ConcurrentModificationException without blocking reads when updating in another thread(hashtable or synchronizedmap)
+ * Track offset for each partition, so operator could start from the last serialized state Use ConcurrentHashMap to
+ * avoid ConcurrentModificationException without blocking reads when updating in another thread(hashtable or
+ * synchronizedmap)
*/
- private final ConcurrentHashMap<Integer, Long> offsetTrack = new ConcurrentHashMap<Integer, Long>();
+ private final ConcurrentHashMap<KafkaPartition, Long> offsetTrack = new ConcurrentHashMap<KafkaPartition, Long>();
@Override
public void create()
{
super.create();
- List<PartitionMetadata> partitionMetaList = KafkaMetadataUtil.getPartitionsForTopic(brokerSet, topic);
- boolean defaultSelect = (partitionIds == null) || (partitionIds.size() == 0);
+ Map<String, List<PartitionMetadata>> partitionMetas = KafkaMetadataUtil.getPartitionsForTopic(brokers, topic);
+ if (kps == null) {
+ kps = new HashSet<KafkaPartition>();
+ }
+ if (kps.size() != 0) {
+ return;
+ }
- // if partition ids are null or not specified , find all the partitions for
- // the specific topic else create the consumers of specified partition ids
- for (PartitionMetadata part : partitionMetaList) {
- final String clientName = getClientName(part.partitionId());
- if (defaultSelect || partitionIds.contains(part.partitionId())) {
- logger.info("Connecting to {}:{} [timeout:{}, buffersize:{}, clientId: {}]", part.leader().host(), part.leader().port(), timeout, bufferSize, clientName);
- simpleConsumerThreads.put(part.partitionId(), new AtomicReference<SimpleConsumer>(new SimpleConsumer(part.leader().host(), part.leader().port(), timeout, bufferSize, clientName)));
- stats.updatePartitionStats(part.partitionId(), part.leader().id(), part.leader().host() + ":" + part.leader().port());
+ // if partition ids are null or not specified , find all the partition metadata for
+ // the specific topic from broker
+ for (Entry<String, List<PartitionMetadata>> en : partitionMetas.entrySet()) {
+ String clusterId = en.getKey();
+ for (PartitionMetadata part : en.getValue()) {
+ KafkaPartition kp = new KafkaPartition(clusterId, topic, part.partitionId());
+ kps.add(kp);
}
}
@@ -186,143 +328,89 @@
super.start();
// thread to consume the kafka data
- kafkaConsumerExecutor = Executors.newFixedThreadPool(simpleConsumerThreads.size(), new ThreadFactoryBuilder().setNameFormat("kafka-consumer-" + topic + "-%d").build());
+ kafkaConsumerExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("kafka-consumer-" + topic + "-%d").build());
+
+ if(metadataRefreshInterval <= 0) {
+ return;
+ }
// background thread to monitor the kafka metadata change
- metadataRefreshExecutor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
- .setNameFormat("kafka-consumer-monitor-" + topic + "-%d").setDaemon(true).build());
+ metadataRefreshExecutor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("kafka-consumer-monitor-" + topic + "-%d").setDaemon(true).build());
// start one monitor thread to monitor the leader broker change and trigger some action
- if (metadataRefreshInterval > 0) {
- metadataRefreshExecutor.scheduleAtFixedRate(new Runnable() {
+ final SimpleKafkaConsumer ref = this;
+ metadataRefreshExecutor.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run()
- {
- if (isAlive && (metadataRefreshRetryLimit == -1 || retryCounter.get() < metadataRefreshRetryLimit)) {
- logger.debug("{}: Update metadata for topic {}", Thread.currentThread().getName(), topic);
- List<PartitionMetadata> pms = KafkaMetadataUtil.getPartitionsForTopic(brokerSet, topic);
- if (pms == null) {
- // retrieve metadata fail
- retryCounter.getAndAdd(1);
- }
- Set<String> newBrokerSet = new HashSet<String>();
- for (PartitionMetadata pm : pms) {
- for (Broker b : pm.isr()) {
- newBrokerSet.add(b.host() + ":" + b.port());
- }
- int pid = pm.partitionId();
- if (simpleConsumerThreads.containsKey(pid)) {
- SimpleConsumer sc = simpleConsumerThreads.get(pid).get();
- if (sc != null && sc.host().equals(pm.leader().host()) && sc.port() == pm.leader().port()) {
- continue;
- }
- // clean the consumer to reestablish the new connection
- logger.info("Detected leader broker change, reconnecting to {}:{} for partition {}", pm.leader().host(), pm.leader().port(), pid);
- cleanAndSet(pid, new SimpleConsumer(pm.leader().host(), pm.leader().port(), timeout, bufferSize, getClientName(pid)));
- stats.updatePartitionStats(pid, pm.leader().id(), pm.leader().host() + ":" + pm.leader().port());
- }
- }
- brokerSet = newBrokerSet;
- // reset to 0 if it reconnect to the broker which has current broker metadata
- retryCounter.set(0);
+ private transient final SetMultimap<Broker, KafkaPartition> deltaPositive = HashMultimap.create();
+
+ @Override
+ public void run()
+ {
+ if (isAlive && (metadataRefreshRetryLimit == -1 || retryCounter.get() < metadataRefreshRetryLimit)) {
+ logger.debug("{}: Update metadata for topic {}", Thread.currentThread().getName(), topic);
+ Map<String, List<PartitionMetadata>> pms = KafkaMetadataUtil.getPartitionsForTopic(brokers, topic);
+ if (pms == null) {
+ // retrieve metadata fail add retry count and return
+ retryCounter.getAndAdd(1);
+ return;
}
+
+ for (Entry<String, List<PartitionMetadata>> pmLEntry : pms.entrySet()) {
+ for (PartitionMetadata pm : pmLEntry.getValue()) {
+ KafkaPartition kp = new KafkaPartition(pmLEntry.getKey(), topic, pm.partitionId());
+ if (!kps.contains(kp)) {
+ // Out of this consumer's scope
+ continue;
+ }
+ Broker b = pm.leader();
+ Broker oldB = partitionToBroker.put(kp, b);
+ if(b.equals(oldB)) {
+ continue;
+ }
+ // add to positive
+ deltaPositive.put(b,kp);
+
+ // always update the latest connection information
+ stats.updatePartitionStats(kp, pm.leader().id(), pm.leader().host() + ":" + pm.leader().port());
+ }
+ }
+
+ // remove from map if the thread is done (partitions on this broker has all been reassigned to others(or temporarily not available) for
+ // example)
+ for (Iterator<Entry<Broker, ConsumerThread>> iterator = simpleConsumerThreads.entrySet().iterator(); iterator.hasNext();) {
+ Entry<Broker, ConsumerThread> item = iterator.next();
+ if (item.getValue().getThreadItSelf().isDone()) {
+ iterator.remove();
+ }
+ }
+
+ for (Broker b : deltaPositive.keySet()) {
+ if (!simpleConsumerThreads.containsKey(b)) {
+ // start thread for new broker
+ ConsumerThread ct = new ConsumerThread(b, deltaPositive.get(b), ref);
+ ct.setThreadItSelf(kafkaConsumerExecutor.submit(ct));
+ simpleConsumerThreads.put(b, ct);
+
+ } else {
+ simpleConsumerThreads.get(b).addPartitions(deltaPositive.get(b));
+ }
+ }
+
+ deltaPositive.clear();
+
+ // reset to 0 if it reconnect to the broker which has current broker metadata
+ retryCounter.set(0);
}
- }, 0, metadataRefreshInterval, TimeUnit.MILLISECONDS);
- }
-
-
-
- for (final Integer pid : simpleConsumerThreads.keySet()) {
- // initialize the stats snapshot for this partition
- statsSnapShot.mark(pid, 0);
- final String clientName = getClientName(pid);
- kafkaConsumerExecutor.submit(new Runnable() {
-
- AtomicReference<SimpleConsumer> csInThreadRef = simpleConsumerThreads.get(pid);
-
- @Override
- public void run()
- {
- // read either from beginning of the broker or last offset committed by the operator
- long offset = 0L;
- if(offsetTrack.get(pid)!=null){
- //start from recovery
- offset = offsetTrack.get(pid);
- logger.debug("Partition {} offset {}", pid, offset);
- } else {
- long startOffsetReq = initialOffset.equalsIgnoreCase("earliest")? OffsetRequest.EarliestTime() : OffsetRequest.LatestTime();
- logger.debug("Partition {} initial offset {} {}", pid, startOffsetReq, initialOffset);
- offset = KafkaMetadataUtil.getLastOffset(csInThreadRef.get(), topic, pid, startOffsetReq, clientName);
- }
- try {
- // stop consuming only when the consumer container is stopped or the metadata can not be refreshed
- while (isAlive && (metadataRefreshRetryLimit == -1 || retryCounter.get() < metadataRefreshRetryLimit)) {
-
- try {
- FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(topic, pid, offset, bufferSize).build();
- SimpleConsumer sc = csInThreadRef.get();
- if (sc == null) {
- if (metadataRefreshInterval > 0) {
- Thread.sleep(metadataRefreshInterval + 1000);
- } else {
- Thread.sleep(100);
- }
- }
- FetchResponse fetchResponse = csInThreadRef.get().fetch(req);
-
- if (fetchResponse.hasError() && fetchResponse.errorCode(topic, pid) == ErrorMapping.OffsetOutOfRangeCode()) {
- // If OffsetOutOfRangeCode happen, it means all msgs have been consumed, clean the consumer and return
- cleanAndSet(pid, null);
- stats.updatePartitionStats(pid, -1, "");
- return;
- } else if (fetchResponse.hasError()) {
- // If error happen, assume
- throw new Exception("Fetch message error, try to reconnect to broker");
- }
-
- for (MessageAndOffset msg : fetchResponse.messageSet(topic, pid)) {
- offset = msg.nextOffset();
- putMessage(pid, msg.message());
- }
- offsetTrack.put(pid, offset);
-
- } catch (Exception e) {
- logger.error("The consumer encounters an exception. Close the connection to partition {} ", pid, e);
- cleanAndSet(pid, null);
- stats.updatePartitionStats(pid, -1, "");
- }
- }
- } finally {
- // close the consumer
- if(csInThreadRef.get()!=null){
- csInThreadRef.get().close();
- }
- logger.info("Exit the consumer thread for partition {} ", pid);
- }
- }
-
- });
- }
- }
-
- private void cleanAndSet(Integer pid, SimpleConsumer newConsumer)
- {
- SimpleConsumer sc = simpleConsumerThreads.get(pid).getAndSet(newConsumer);
- if (sc != null) {
- logger.info("Close old connection to partition {}", pid);
- sc.close();
- }
+ }
+ }, 0, metadataRefreshInterval, TimeUnit.MILLISECONDS);
}
@Override
public void close()
{
logger.info("Stop all consumer threads");
- for(AtomicReference<SimpleConsumer> simConsumerRef : simpleConsumerThreads.values()){
- if(simConsumerRef.get()!=null) {
- simConsumerRef.get().close();
- }
+ for (ConsumerThread ct : simpleConsumerThreads.values()) {
+ ct.getThreadItSelf().cancel(true);
}
simpleConsumerThreads.clear();
metadataRefreshExecutor.shutdownNow();
@@ -380,64 +468,50 @@
}
@Override
- protected KafkaConsumer cloneConsumer(Set<Integer> partitionIds, Map<Integer, Long> startOffset)
- {
- // create different client for same partition
- SimpleKafkaConsumer skc = new SimpleKafkaConsumer(brokerSet, topic, timeout, bufferSize, clientId, partitionIds);
- skc.setCacheSize(getCacheSize());
- skc.setMetadataRefreshInterval(getMetadataRefreshInterval());
- skc.setMetadataRefreshRetryLimit(getMetadataRefreshRetryLimit());
- skc.initialOffset = this.initialOffset;
- skc.resetOffset(startOffset);
- skc.setCacheSize(getCacheSize());
- return skc;
- }
-
- @Override
- protected KafkaConsumer cloneConsumer(Set<Integer> partitionIds){
- return cloneConsumer(partitionIds, null);
- }
-
- @Override
protected void commitOffset()
{
// the simple consumer offset is kept in the offsetTrack
- // It's better to do server registry for client in the future. Wait for kafka community come up with more sophisticated offset management
- //TODO https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management#
+ // It's better to do server registry for client in the future. Wait for kafka community come up with more
+ // sophisticated offset management
+ // TODO https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management#
}
-
- private String getClientName(int pid){
- return clientId + SIMPLE_CONSUMER_ID_SUFFIX + pid;
+ private String getClientName(String brokerName)
+ {
+ return clientId + SIMPLE_CONSUMER_ID_SUFFIX + brokerName;
}
@Override
- protected Map<Integer, Long> getCurrentOffsets()
+ protected Map<KafkaPartition, Long> getCurrentOffsets()
{
return offsetTrack;
}
- private void resetOffset(Map<Integer, Long> overrideOffset){
-
- if(overrideOffset == null){
+ public void resetOffset(Map<KafkaPartition, Long> overrideOffset)
+ {
+ if (overrideOffset == null) {
return;
}
offsetTrack.clear();
// set offset of the partitions assigned to this consumer
- for (Integer pid: partitionIds) {
- Long offsetForPar = overrideOffset.get(pid);
+ for (KafkaPartition kp : kps) {
+ Long offsetForPar = overrideOffset.get(kp);
if (offsetForPar != null) {
- offsetTrack.put(pid, offsetForPar);
+ offsetTrack.put(kp, offsetForPar);
}
}
}
- @Override
- public KafkaMeterStats getConsumerStats()
+ public KafkaMeterStats getConsumerStats(Map<KafkaPartition, Long> offsetStats)
{
- stats.updateOffsets(offsetTrack);
+ stats.updateOffsets(offsetStats);
return super.getConsumerStats();
}
-
+ @Override
+ protected void resetPartitionsAndOffset(Set<KafkaPartition> partitionIds, Map<KafkaPartition, Long> startOffset)
+ {
+ this.kps = partitionIds;
+ resetOffset(startOffset);
+ }
} // End of SimpleKafkaConsumer
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
index 190af78..657676c 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
@@ -15,29 +15,41 @@
*/
package com.datatorrent.contrib.kafka;
-import java.util.*;
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.BaseOperator;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-
+import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
import org.slf4j.LoggerFactory;
-import com.datatorrent.api.*;
-import com.datatorrent.api.DAG.Locality;
-
public class KafkaInputOperatorTest extends KafkaOperatorTestBase
{
static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
- static HashMap<String, List<?>> collections = new HashMap<String, List<?>>();
static AtomicInteger tupleCount = new AtomicInteger();
static CountDownLatch latch;
/**
* Test Operator to collect tuples from KafkaSingleInputStringOperator.
- *
+ *
* @param <T>
*/
public static class CollectorModule<T> extends BaseOperator
@@ -47,13 +59,10 @@
public static class CollectorInputPort<T> extends DefaultInputPort<T>
{
- ArrayList<T> list;
- final String id;
public CollectorInputPort(String id, Operator module)
{
super();
- this.id = id;
}
@Override
@@ -65,16 +74,13 @@
}
return;
}
- list.add(tuple);
tupleCount.incrementAndGet();
}
@Override
public void setConnected(boolean flag)
{
- if (flag) {
- collections.put(id, list = new ArrayList<T>());
- }
+ tupleCount.set(0);
}
}
@@ -83,20 +89,20 @@
* Kafka, aka consumer). This module receives data from an outside test
* generator through Kafka message bus and feed that data into Malhar
* streaming platform.
- *
+ *
* [Generate message and send that to Kafka message bus] ==> [Receive that
* message through Kafka input adapter(i.e. consumer) and send using
* emitTuples() interface on output port during onMessage call]
- *
- *
+ *
+ *
* @throws Exception
*/
- public void testKafkaInputOperator(int sleepTime, final int totalCount, KafkaConsumer consumer, boolean isValid) throws Exception
+ public void testKafkaInputOperator(int sleepTime, final int totalCount, KafkaConsumer consumer, boolean isValid, boolean idempotent) throws Exception
{
// initial the latch for this test
latch = new CountDownLatch(1);
-
-
+
+
// Start producer
KafkaTestProducer p = new KafkaTestProducer(TEST_TOPIC);
p.setSendCount(totalCount);
@@ -110,14 +116,15 @@
// Create KafkaSinglePortStringInputOperator
KafkaSinglePortStringInputOperator node = dag.addOperator("Kafka message consumer", KafkaSinglePortStringInputOperator.class);
+ node.setIdempotent(idempotent);
consumer.setTopic(TEST_TOPIC);
- if (isValid) {
- Set<String> brokerSet = new HashSet<String>();
- brokerSet.add("localhost:9092");
- consumer.setBrokerSet(brokerSet);
- }
+
node.setConsumer(consumer);
-
+
+ if (isValid) {
+ node.setZookeeper("localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
+ }
+
// Create Test tuple collector
CollectorModule<String> collector = dag.addOperator("TestMessageCollector", new CollectorModule<String>());
@@ -129,51 +136,57 @@
lc.setHeartbeatMonitoringEnabled(false);
lc.runAsync();
-
+
// Wait 30s for consumer finish consuming all the messages
- Assert.assertTrue("TIMEOUT: 30s ", latch.await(30000, TimeUnit.MILLISECONDS));
-
+ Assert.assertTrue("TIMEOUT: 30s ", latch.await(300000, TimeUnit.MILLISECONDS));
+
// Check results
- Assert.assertEquals("Collections size", 1, collections.size());
- Assert.assertEquals("Tuple count", totalCount, collections.get(collector.inputPort.id).size());
- logger.debug(String.format("Number of emitted tuples: %d", collections.get(collector.inputPort.id).size()));
-
+ Assert.assertEquals("Tuple count", totalCount, tupleCount.intValue());
+ logger.debug(String.format("Number of emitted tuples: %d", tupleCount.intValue()));
+
p.close();
lc.shutdown();
}
@Test
- public void testKafkaInputOperator_Highleverl() throws Exception
+ public void testKafkaInputOperator_Highlevel() throws Exception
{
int totalCount = 10000;
Properties props = new Properties();
- props.put("zookeeper.connect", "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT);
props.put("group.id", "group1");
- props.put("consumer.id", "default_consumer");
// This damn property waste me 2 days! It's a 0.8 new property. "smallest" means
// reset the consumer to the beginning of the message that is not consumed yet
// otherwise it wont get any of those the produced before!
KafkaConsumer k = new HighlevelKafkaConsumer(props);
k.setInitialOffset("earliest");
- testKafkaInputOperator(1000, totalCount, k, true);
+ testKafkaInputOperator(1000, totalCount, k, true, false);
}
-
+
@Test
public void testKafkaInputOperator_Simple() throws Exception
{
int totalCount = 10000;
KafkaConsumer k = new SimpleKafkaConsumer();
k.setInitialOffset("earliest");
- testKafkaInputOperator(1000, totalCount, k, true);
+ testKafkaInputOperator(1000, totalCount, k, true, false);
}
-
+
+ @Test
+ public void testKafkaInputOperator_Simple_Idempotent() throws Exception
+ {
+ int totalCount = 10000;
+ KafkaConsumer k = new SimpleKafkaConsumer();
+ k.setInitialOffset("earliest");
+ testKafkaInputOperator(1000, totalCount, k, true, true);
+ }
+
@Test
public void testKafkaInputOperator_Invalid() throws Exception
{
int totalCount = 10000;
SimpleKafkaConsumer consumer = new SimpleKafkaConsumer();
try{
- testKafkaInputOperator(1000, totalCount,consumer, false);
+ testKafkaInputOperator(1000, totalCount,consumer, false, false);
}catch(Exception e){
// invalid host setup expect to fail here
Assert.assertEquals("Error creating local cluster", e.getMessage());
@@ -184,7 +197,119 @@
@After
public void afterTest()
{
- collections.clear();
+ tupleCount.set(0);
super.afterTest();
}
+
+ public static class TestMeta extends TestWatcher
+ {
+ String baseDir;
+ String recoveryDir;
+ KafkaSinglePortStringInputOperator operator;
+ CollectorTestSink<Object> sink;
+ Context.OperatorContext context;
+
+ @Override
+ protected void starting(Description description)
+ {
+ String methodName = description.getMethodName();
+ String className = description.getClassName();
+ baseDir = "target/" + className + "/" + methodName;
+ recoveryDir = baseDir + "/" + "recovery";
+ try {
+ FileUtils.deleteDirectory(new File(recoveryDir));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Test
+ public void testRecoveryAndIdempotency() throws Exception
+ {
+ int totalCount = 1500;
+
+ // initial the latch for this test
+ latch = new CountDownLatch(50);
+
+ // Start producer
+ KafkaTestProducer p = new KafkaTestProducer(TEST_TOPIC);
+ p.setSendCount(totalCount);
+ new Thread(p).start();
+
+ Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500);
+
+ testMeta.context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap);
+ testMeta.operator = new KafkaSinglePortStringInputOperator();
+ ((IdempotentStorageManager.FSIdempotentStorageManager) testMeta.operator.getIdempotentStorageManager()).setRecoveryPath(testMeta.recoveryDir);
+
+ KafkaConsumer consumer = new SimpleKafkaConsumer();
+ consumer.setTopic(TEST_TOPIC);
+ consumer.setInitialOffset("earliest");
+
+ testMeta.operator.setConsumer(consumer);
+ testMeta.operator.setZookeeper("localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
+ testMeta.operator.setMaxTuplesPerWindow(500);
+ testMeta.sink = new CollectorTestSink<Object>();
+ testMeta.operator.outputPort.setSink(testMeta.sink);
+
+ testMeta.operator.setup(testMeta.context);
+ testMeta.operator.activate(testMeta.context);
+ latch.await(4000, TimeUnit.MILLISECONDS);
+ testMeta.operator.beginWindow(1);
+ testMeta.operator.emitTuples();
+ testMeta.operator.endWindow();
+ testMeta.operator.beginWindow(2);
+ testMeta.operator.emitTuples();
+ testMeta.operator.endWindow();
+
+ //failure and then re-deployment of operator
+ testMeta.sink.collectedTuples.clear();
+ testMeta.operator.teardown();
+ testMeta.operator.setup(testMeta.context);
+
+ Assert.assertEquals("largest recovery window", 2, testMeta.operator.getIdempotentStorageManager().getLargestRecoveryWindow());
+
+ testMeta.operator.beginWindow(1);
+ testMeta.operator.emitTuples();
+ testMeta.operator.endWindow();
+ testMeta.operator.beginWindow(2);
+ testMeta.operator.emitTuples();
+ testMeta.operator.endWindow();
+ latch.await(3000, TimeUnit.MILLISECONDS);
+ // Emiting data after all recovery windows are replayed
+ testMeta.operator.beginWindow(3);
+ testMeta.operator.emitTuples();
+ testMeta.operator.endWindow();
+
+ Assert.assertEquals("Total messages collected ", totalCount, testMeta.sink.collectedTuples.size());
+ testMeta.sink.collectedTuples.clear();
+ }
+
+ @Test
+ public void testZookeeper() throws Exception
+ {
+ // initial the latch for this test
+ latch = new CountDownLatch(50);
+
+ testMeta.operator = new KafkaSinglePortStringInputOperator();
+
+ KafkaConsumer consumer = new SimpleKafkaConsumer();
+ consumer.setTopic(TEST_TOPIC);
+
+ testMeta.operator.setConsumer(consumer);
+ testMeta.operator.setZookeeper("cluster1::node0,node1,node2:2181,node3:2182;cluster2::node4:2181");
+ latch.await(500, TimeUnit.MILLISECONDS);
+
+ Assert.assertEquals("Total size of clusters ", 5, testMeta.operator.getConsumer().getZookeeper().size());
+ Assert.assertEquals("Number of nodes in cluster1 ", 4, testMeta.operator.getConsumer().getZookeeper().get("cluster1").size());
+ Assert.assertEquals("Nodes in cluster1 ", "[node0:2181, node2:2181, node3:2182, node1:2181]", testMeta.operator.getConsumer().getZookeeper().get("cluster1").toString());
+ Assert.assertEquals("Number of nodes in cluster2 ", 1, testMeta.operator.getConsumer().getZookeeper().get("cluster2").size());
+ Assert.assertEquals("Nodes in cluster2 ", "[node4:2181]", testMeta.operator.getConsumer().getZookeeper().get("cluster2").toString());
+ }
+
}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java
index 2f7e485..2a96562 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java
@@ -26,107 +26,127 @@
import kafka.utils.Utils;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.junit.After;
import org.junit.Before;
import org.slf4j.LoggerFactory;
/**
- * This is a base class setup/clean Kafka testing environment for all the input/output test
- * If it's a multipartition test, this class creates 2 kafka partitions
+ * This is a base class setup/clean Kafka testing environment for all the input/output test If it's a multipartition
+ * test, this class creates 2 kafka partitions
*/
public class KafkaOperatorTestBase
{
public static final String END_TUPLE = "END_TUPLE";
- public static final int TEST_ZOOKEEPER_PORT = 2182;
- public static final int TEST_KAFKA_BROKER1_PORT = 9092;
- public static final int TEST_KAFKA_BROKER2_PORT = 9093;
+ public static final int[] TEST_ZOOKEEPER_PORT = new int[] { 2182, 2183 };
+ public static final int[][] TEST_KAFKA_BROKER_PORT = new int[][] { new int[] { 9092, 9093 }, new int[] { 9094, 9095 } };
public static final String TEST_TOPIC = "test_topic";
static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaOperatorTestBase.class);
// since Kafka 0.8 use KafkaServerStatble instead of KafkaServer
- private KafkaServerStartable kserver;
- // it wont be initialized unless hasMultiPartition is set to true
- private KafkaServerStartable kserver2;
- private NIOServerCnxnFactory standaloneServerFactory;
+
+ // multiple brokers in multiple cluster
+ private final KafkaServerStartable[][] broker = new KafkaServerStartable[2][2];
+
+ // multiple cluster
+ private final ServerCnxnFactory[] zkFactory = new ServerCnxnFactory[2];
public String baseDir = "/tmp";
- private final String zkdir = "zookeeper-server-data";
- private final String kafkadir1 = "kafka-server-data1";
- private final String kafkadir2 = "kafka-server-data2";
+ private final String zkBaseDir = "zookeeper-server-data";
+ private final String kafkaBaseDir = "kafka-server-data";
+ private final String[] zkdir = new String[] { "zookeeper-server-data/1", "zookeeper-server-data/2" };
+ private final String[][] kafkadir = new String[][] { new String[] { "kafka-server-data/1/1", "kafka-server-data/1/2" }, new String[] { "kafka-server-data/2/1", "kafka-server-data/2/2" } };
protected boolean hasMultiPartition = false;
+ protected boolean hasMultiCluster = false;
-
- public void startZookeeper()
+ public void startZookeeper(final int clusterId)
{
try {
- int clientPort = TEST_ZOOKEEPER_PORT;
+
+ int clientPort = TEST_ZOOKEEPER_PORT[clusterId];
int numConnections = 10;
int tickTime = 2000;
- File dir = new File(baseDir, zkdir);
+ File dir = new File(baseDir, zkdir[clusterId]);
- ZooKeeperServer kserver = new ZooKeeperServer(dir, dir, tickTime);
- standaloneServerFactory = new NIOServerCnxnFactory();
- standaloneServerFactory.configure(new InetSocketAddress(clientPort), numConnections);
- standaloneServerFactory.startup(kserver); // start the zookeeper server.
- //kserver.startup();
- }
- catch (InterruptedException ex) {
- logger.debug(ex.getLocalizedMessage());
- }
- catch (IOException ex) {
+ TestZookeeperServer kserver = new TestZookeeperServer(dir, dir, tickTime);
+ zkFactory[clusterId] = new NIOServerCnxnFactory();
+ zkFactory[clusterId].configure(new InetSocketAddress(clientPort), numConnections);
+
+ zkFactory[clusterId].startup(kserver); // start the zookeeper server.
+ Thread.sleep(2000);
+ kserver.startup();
+ } catch (Exception ex) {
logger.debug(ex.getLocalizedMessage());
}
}
public void stopZookeeper()
{
- standaloneServerFactory.shutdown();
- Utils.rm(new File(baseDir, zkdir));
+ for (ServerCnxnFactory zkf : zkFactory) {
+ if (zkf != null) {
+ zkf.shutdown();
+ }
+ }
+ Utils.rm(new File(baseDir, zkBaseDir));
+ }
+
+ public void startKafkaServer(int clusterid, int brokerid, int defaultPartitions)
+ {
+ Properties props = new Properties();
+ props.setProperty("broker.id", "" + brokerid);
+ props.setProperty("log.dirs", new File(baseDir, kafkadir[clusterid][brokerid]).toString());
+ props.setProperty("zookeeper.connect", "localhost:" + TEST_ZOOKEEPER_PORT[clusterid]);
+ props.setProperty("port", "" + TEST_KAFKA_BROKER_PORT[clusterid][brokerid]);
+ props.setProperty("default.replication.factor", "1");
+ // set this to 50000 to boost the performance so most test data are in memory before flush to disk
+ props.setProperty("log.flush.interval.messages", "50000");
+ if (hasMultiPartition) {
+ props.setProperty("num.partitions", "2");
+ } else {
+ props.setProperty("num.partitions", "1");
+ }
+
+ broker[clusterid][brokerid] = new KafkaServerStartable(new KafkaConfig(props));
+ broker[clusterid][brokerid].startup();
+
}
public void startKafkaServer()
{
- Properties props = new Properties();
- props.setProperty("broker.id", "0");
- props.setProperty("log.dirs", new File(baseDir, kafkadir1).toString());
- props.setProperty("zookeeper.connect", "localhost:"+TEST_ZOOKEEPER_PORT);
- props.setProperty("port", ""+TEST_KAFKA_BROKER1_PORT);
- if(hasMultiPartition){
- props.setProperty("num.partitions", "2");
- props.setProperty("default.replication.factor", "2");
- } else {
- props.setProperty("num.partitions", "1");
+ boolean[][] startable = new boolean[][] { new boolean[] { true, hasMultiPartition }, new boolean[] { hasMultiCluster, hasMultiCluster && hasMultiPartition } };
+ for (int i = 0; i < startable.length; i++) {
+ for (int j = 0; j < startable[i].length; j++) {
+ if (startable[i][j])
+ startKafkaServer(i, j, hasMultiPartition ? 2 : 1);
+ }
}
- // set this to 50000 to boost the performance so most test data are in memory before flush to disk
- props.setProperty("log.flush.interval.messages", "50000");
- kserver = new KafkaServerStartable(new KafkaConfig(props));
- kserver.startup();
- if(hasMultiPartition){
- props.setProperty("broker.id", "1");
- props.setProperty("log.dirs", new File(baseDir, kafkadir2).toString());
- props.setProperty("port", "" + TEST_KAFKA_BROKER2_PORT);
- props.setProperty("num.partitions", "2");
- props.setProperty("default.replication.factor", "2");
- kserver2 = new KafkaServerStartable(new KafkaConfig(props));
- kserver2.startup();
+
+ // startup is asynch operation. wait 2 sec for server to startup
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
}
}
public void stopKafkaServer()
{
- if(hasMultiPartition){
- kserver2.shutdown();
- kserver2.awaitShutdown();
- Utils.rm(new File(baseDir, kafkadir2));
+ for (int i = 0; i < broker.length; i++) {
+ for (int j = 0; j < broker[i].length; j++) {
+ if (broker[i][j] != null) {
+ broker[i][j].shutdown();
+ broker[i][j].awaitShutdown();
+ }
+ }
}
- kserver.shutdown();
- kserver.awaitShutdown();
- Utils.rm(new File(baseDir, kafkadir1));
+ Utils.rm(new File(baseDir, kafkaBaseDir));
}
@Before
@@ -135,22 +155,32 @@
try {
startZookeeper();
startKafkaServer();
- createTopic(TEST_TOPIC);
- }
- catch (java.nio.channels.CancelledKeyException ex) {
+ createTopic(0, TEST_TOPIC);
+ if (hasMultiCluster) {
+ createTopic(1, TEST_TOPIC);
+ }
+ } catch (java.nio.channels.CancelledKeyException ex) {
logger.debug("LSHIL {}", ex.getLocalizedMessage());
}
}
- public void createTopic(String topicName)
+ public void startZookeeper()
+ {
+ startZookeeper(0);
+ if (hasMultiCluster) {
+ startZookeeper(1);
+ }
+ }
+
+ public void createTopic(int clusterid, String topicName)
{
String[] args = new String[9];
args[0] = "--zookeeper";
- args[1] = "localhost:" + TEST_ZOOKEEPER_PORT;
+ args[1] = "localhost:" + TEST_ZOOKEEPER_PORT[clusterid];
args[2] = "--replication-factor";
args[3] = "1";
args[4] = "--partitions";
- if(hasMultiPartition){
+ if (hasMultiPartition) {
args[5] = "2";
} else {
args[5] = "1";
@@ -160,10 +190,11 @@
args[8] = "--create";
TopicCommand.main(args);
- //Right now, there is no programmatic synchronized way to create the topic. have to wait 2 sec to make sure the topic is created
+ // Right now, there is no programmatic synchronized way to create the topic. have to wait 2 sec to make sure the
+ // topic is created
// So the tests will not hit any bizarre failure
try {
- Thread.sleep(2000);
+ Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
@@ -175,8 +206,7 @@
try {
stopKafkaServer();
stopZookeeper();
- }
- catch (java.nio.channels.CancelledKeyException ex) {
+ } catch (Exception ex) {
logger.debug("LSHIL {}", ex.getLocalizedMessage());
}
}
@@ -185,5 +215,54 @@
{
this.hasMultiPartition = hasMultiPartition;
}
-}
+ public void setHasMultiCluster(boolean hasMultiCluster)
+ {
+ this.hasMultiCluster = hasMultiCluster;
+ }
+
+ public static class TestZookeeperServer extends ZooKeeperServer
+ {
+
+ public TestZookeeperServer()
+ {
+ super();
+ // TODO Auto-generated constructor stub
+ }
+
+ public TestZookeeperServer(File snapDir, File logDir, int tickTime) throws IOException
+ {
+ super(snapDir, logDir, tickTime);
+ // TODO Auto-generated constructor stub
+ }
+
+ public TestZookeeperServer(FileTxnSnapLog txnLogFactory, DataTreeBuilder treeBuilder) throws IOException
+ {
+ super(txnLogFactory, treeBuilder);
+ // TODO Auto-generated constructor stub
+ }
+
+ public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, DataTreeBuilder treeBuilder) throws IOException
+ {
+ super(txnLogFactory, tickTime, treeBuilder);
+ // TODO Auto-generated constructor stub
+ }
+
+ public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb)
+ {
+ super(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, treeBuilder, zkDb);
+ // TODO Auto-generated constructor stub
+ }
+
+ @Override
+ protected void registerJMX()
+ {
+ }
+
+ @Override
+ protected void unregisterJMX()
+ {
+ }
+
+ }
+}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
index 83734e9..dc06b7f 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
@@ -76,7 +76,6 @@
dataGeneratorThread = new Thread("String Generator")
{
@Override
- @SuppressWarnings("SleepWhileInLoop")
public void run()
{
try {
@@ -119,7 +118,7 @@
* @throws Exception
*/
@Test
- @SuppressWarnings({"SleepWhileInLoop", "empty-statement", "rawtypes"})
+ @SuppressWarnings({"rawtypes", "unchecked"})
public void testKafkaOutputOperator() throws Exception
{
//initialize the latch to synchronize the threads
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaPartitionableInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaPartitionableInputOperatorTest.java
index ebb8cce..34c0199 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaPartitionableInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaPartitionableInputOperatorTest.java
@@ -22,22 +22,41 @@
import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.*;
import com.datatorrent.api.DAG.Locality;
/**
- * A test to verify the input operator will be automatically partitioned per kafka partition
- * This test is launching its own Kafka cluster.
+ * A test to verify the input operator will be automatically partitioned per kafka partition This test is launching its
+ * own Kafka cluster.
*/
+@RunWith(Parameterized.class)
public class KafkaPartitionableInputOperatorTest extends KafkaOperatorTestBase
{
- public KafkaPartitionableInputOperatorTest()
+ private int totalBrokers = 0;
+
+ @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}")
+ public static Collection<Boolean[]> testScenario()
+ {
+ return Arrays.asList(new Boolean[][] { { true, false }, // multi cluster with single partition
+ { true, true }, // multi cluster with multi partitions
+ { false, true }, // single cluster with multi partitions
+ { false, false } // single cluster with single partition
+ });
+ }
+
+ public KafkaPartitionableInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition)
{
// This class want to initialize several kafka brokers for multiple partitions
- hasMultiPartition = true;
+ this.hasMultiCluster = hasMultiCluster;
+ this.hasMultiPartition = hasMultiPartition;
+ int cluster = 1 + (hasMultiCluster ? 1 : 0);
+ totalBrokers = (1 + (hasMultiPartition ? 1 : 0)) * cluster;
+
}
static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaPartitionableInputOperatorTest.class);
@@ -89,14 +108,11 @@
}
/**
- * Test AbstractKafkaSinglePortInputOperator (i.e. an input adapter for
- * Kafka, aka consumer). This module receives data from an outside test
- * generator through Kafka message bus and feed that data into Malhar
- * streaming platform.
+ * Test AbstractKafkaSinglePortInputOperator (i.e. an input adapter for Kafka, aka consumer). This module receives
+ * data from an outside test generator through Kafka message bus and feed that data into Malhar streaming platform.
*
- * [Generate message and send that to Kafka message bus] ==> [Receive that
- * message through Kafka input adapter(i.e. consumer) and send using
- * emitTuples() interface on output port during onMessage call]
+ * [Generate message and send that to Kafka message bus] ==> [Receive that message through Kafka input adapter(i.e.
+ * consumer) and send using emitTuples() interface on output port during onMessage call]
*
*
* @throws Exception
@@ -114,21 +130,21 @@
{
// Create template high-level consumer
Properties props = new Properties();
- props.put("zookeeper.connect", "localhost:2182");
props.put("group.id", "main_group");
HighlevelKafkaConsumer consumer = new HighlevelKafkaConsumer(props);
testPartitionableInputOperator(consumer);
}
- public void testPartitionableInputOperator(KafkaConsumer consumer) throws Exception{
+ public void testPartitionableInputOperator(KafkaConsumer consumer) throws Exception
+ {
- // Set to 2 because we want to make sure END_TUPLE from both 2 partitions are received
- latch = new CountDownLatch(2);
+ // each broker should get a END_TUPLE message
+ latch = new CountDownLatch(totalBrokers);
int totalCount = 10000;
// Start producer
- KafkaTestProducer p = new KafkaTestProducer(TEST_TOPIC, true);
+ KafkaTestProducer p = new KafkaTestProducer(TEST_TOPIC, hasMultiPartition, hasMultiCluster);
p.setSendCount(totalCount);
new Thread(p).start();
@@ -137,20 +153,18 @@
DAG dag = lma.getDAG();
// Create KafkaSinglePortStringInputOperator
- PartitionableKafkaSinglePortStringInputOperator node = dag.addOperator("Kafka message consumer", PartitionableKafkaSinglePortStringInputOperator.class);
+ KafkaSinglePortStringInputOperator node = dag.addOperator("Kafka message consumer", KafkaSinglePortStringInputOperator.class);
node.setInitialPartitionCount(1);
- //set topic
+ // set topic
consumer.setTopic(TEST_TOPIC);
- //set the brokerlist used to initialize the partition
- Set<String> brokerSet = new HashSet<String>();
- brokerSet.add("localhost:9092");
- brokerSet.add("localhost:9093");
- consumer.setBrokerSet(brokerSet);
consumer.setInitialOffset("earliest");
node.setConsumer(consumer);
+ String clusterString = "cluster1::localhost:" + TEST_ZOOKEEPER_PORT[0] + (hasMultiCluster ? ";cluster2::localhost:" + TEST_ZOOKEEPER_PORT[1] : "");
+ node.setZookeeper(clusterString);
+
// Create Test tuple collector
CollectorModule<String> collector = dag.addOperator("TestMessageCollector", new CollectorModule<String>());
@@ -164,7 +178,7 @@
lc.runAsync();
// Wait 30s for consumer finish consuming all the messages
- Assert.assertTrue("TIMEOUT: 30s ", latch.await(30000, TimeUnit.MILLISECONDS));
+ Assert.assertTrue("TIMEOUT: 40s ", latch.await(40000, TimeUnit.MILLISECONDS));
// Check results
Assert.assertEquals("Collections size", 1, collections.size());
@@ -173,6 +187,9 @@
p.close();
lc.shutdown();
+ // kafka has a bug shutdown connector you have to make sure kafka client resource has been cleaned before clean the broker
+ Thread.sleep(5000);
}
+
}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestProducer.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestProducer.java
index 810b653..c0337f3 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestProducer.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestProducer.java
@@ -29,11 +29,13 @@
{
// private static final Logger logger = LoggerFactory.getLogger(KafkaTestProducer.class);
private final kafka.javaapi.producer.Producer<String, String> producer;
+ private final kafka.javaapi.producer.Producer<String, String> producer1;
private final String topic;
private int sendCount = 20;
// to generate a random int as a key for partition
private final Random rand = new Random();
private boolean hasPartition = false;
+ private boolean hasMultiCluster = false;
private List<String> messages;
private String producerType = "async";
@@ -52,17 +54,17 @@
this.messages = messages;
}
- private ProducerConfig createProducerConfig()
+ private ProducerConfig createProducerConfig(int cid)
{
Properties props = new Properties();
props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
props.setProperty("key.serializer.class", "kafka.serializer.StringEncoder");
// props.put("metadata.broker.list", "localhost:"+KafkaOperatorTestBase.TEST_KAFKA_BROKER1_PORT );
if(hasPartition){
- props.put("metadata.broker.list", "localhost:"+KafkaOperatorTestBase.TEST_KAFKA_BROKER1_PORT + ",localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER2_PORT);
+ props.put("metadata.broker.list", "localhost:"+KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][0] + ",localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][1]);
props.setProperty("partitioner.class", KafkaTestPartitioner.class.getCanonicalName());
} else {
- props.put("metadata.broker.list", "localhost:"+KafkaOperatorTestBase.TEST_KAFKA_BROKER1_PORT );
+ props.put("metadata.broker.list", "localhost:"+KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][0] );
}
props.setProperty("topic.metadata.refresh.interval.ms", "20000");
@@ -76,13 +78,23 @@
this(topic, false);
}
- public KafkaTestProducer(String topic, boolean hasPartition)
+ public KafkaTestProducer(String topic, boolean hasPartition, boolean hasMultiCluster)
{
// Use random partitioner. Don't need the key type. Just set it to Integer.
// The message is of type String.
this.topic = topic;
this.hasPartition = hasPartition;
- producer = new Producer<String, String>(createProducerConfig());
+ this.hasMultiCluster = hasMultiCluster;
+ producer = new Producer<String, String>(createProducerConfig(0));
+ if(hasMultiCluster){
+ producer1 = new Producer<String, String>(createProducerConfig(1));
+ } else {
+ producer1 = null;
+ }
+ }
+
+ public KafkaTestProducer(String topic, boolean hasPartition) {
+ this(topic, hasPartition, false);
}
private void generateMessages()
@@ -92,15 +104,25 @@
while (messageNo <= sendCount) {
String messageStr = "Message_" + messageNo;
int k = rand.nextInt(100);
- producer.send(new KeyedMessage<String, String>(topic, "" + k, messageStr));
+ producer.send(new KeyedMessage<String, String>(topic, "" + k, "c1" + messageStr));
+ if(hasMultiCluster){
+ messageNo++;
+ producer1.send(new KeyedMessage<String, String>(topic, "" + k, "c2" + messageStr));
+ }
messageNo++;
// logger.debug(String.format("Producing %s", messageStr));
}
// produce the end tuple to let the test input operator know it's done produce messages
producer.send(new KeyedMessage<String, String>(topic, "" + 0, KafkaOperatorTestBase.END_TUPLE));
+ if(hasMultiCluster) {
+ producer1.send(new KeyedMessage<String, String>(topic, "" + 0, KafkaOperatorTestBase.END_TUPLE));
+ }
if(hasPartition){
// Send end_tuple to other partition if it exist
producer.send(new KeyedMessage<String, String>(topic, "" + 1, KafkaOperatorTestBase.END_TUPLE));
+ if(hasMultiCluster) {
+ producer1.send(new KeyedMessage<String, String>(topic, "" + 1, KafkaOperatorTestBase.END_TUPLE));
+ }
}
}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java
index fa7b3a1..43e40d0 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java
@@ -18,12 +18,10 @@
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -44,7 +42,9 @@
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.Operator;
import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.contrib.kafka.AbstractPartitionableKafkaInputOperator.PartitionStrategy;
+import com.datatorrent.contrib.kafka.AbstractKafkaInputOperator.PartitionStrategy;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
public class OffsetManagerTest extends KafkaOperatorTestBase
{
@@ -64,7 +64,7 @@
public static class TestOffsetManager implements OffsetManager{
- private final transient Map<Integer, Long> offsets = Collections.synchronizedMap(new HashMap<Integer, Long>());
+ private final transient Map<KafkaPartition, Long> offsets = Collections.synchronizedMap(new HashMap<KafkaPartition, Long>());
private String filename = null;
@@ -78,15 +78,17 @@
}
@Override
- public Map<Integer, Long> loadInitialOffsets()
+ public Map<KafkaPartition, Long> loadInitialOffsets()
{
- offsets.put(0, 10l);
- offsets.put(1, 10l);
+ KafkaPartition kp0 = new KafkaPartition(TEST_TOPIC, 0);
+ KafkaPartition kp1 = new KafkaPartition(TEST_TOPIC, 1);
+ offsets.put(kp0, 10l);
+ offsets.put(kp1, 10l);
return offsets;
}
@Override
- public void updateOffsets(Map<Integer, Long> offsetsOfPartitions)
+ public void updateOffsets(Map<KafkaPartition, Long> offsetsOfPartitions)
{
offsets.putAll(offsetsOfPartitions);
@@ -95,7 +97,7 @@
Path tmpFile = new Path(filename + ".tmp");
Path dataFile = new Path(filename);
FSDataOutputStream out = fs.create(tmpFile, true);
- for (Entry<Integer, Long> e : offsets.entrySet()) {
+ for (Entry<KafkaPartition, Long> e : offsets.entrySet()) {
out.writeBytes(e.getKey() +", " + e.getValue() + "\n");
}
out.close();
@@ -112,8 +114,8 @@
if (latch.getCount() == 1) {
// when latch is 1, it means consumer has consumed all the messages
int count = 0;
- for (Entry<Integer, Long> entry : offsets.entrySet()) {
- count += entry.getValue();
+ for (long entry : offsets.values()) {
+ count += entry;
}
if (count == totalCount + 2) {
// wait until all offsets add up to totalCount messages + 2 control END_TUPLE
@@ -220,7 +222,7 @@
DAG dag = lma.getDAG();
// Create KafkaSinglePortStringInputOperator
- PartitionableKafkaSinglePortStringInputOperator node = dag.addOperator("Kafka message consumer", PartitionableKafkaSinglePortStringInputOperator.class);
+ KafkaSinglePortStringInputOperator node = dag.addOperator("Kafka message consumer", KafkaSinglePortStringInputOperator.class);
TestOffsetManager tfm = new TestOffsetManager();
@@ -234,11 +236,10 @@
//set topic
consumer.setTopic(TEST_TOPIC);
- //set the brokerlist used to initialize the partition
- Set<String> brokerSet = new HashSet<String>();
- brokerSet.add("localhost:9092");
- brokerSet.add("localhost:9093");
- consumer.setBrokerSet(brokerSet);
+ //set the zookeeper list used to initialize the partition
+ SetMultimap<String, String> zookeeper = HashMultimap.create();
+ zookeeper.put(KafkaPartition.DEFAULT_CLUSTERID, "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
+ consumer.setZookeeper(zookeeper);
consumer.setInitialOffset("earliest");
node.setConsumer(consumer);
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java
index 509a12c..c8ad7f6 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java
@@ -1,16 +1,15 @@
package com.datatorrent.contrib.kafka;
+import com.datatorrent.lib.util.TestUtils;
+import com.esotericsoftware.kryo.Kryo;
import org.junit.Assert;
import org.junit.Test;
-import java.util.HashMap;
-import java.util.HashSet;
-
public class SimpleKakfaConsumerTest
{
@Test
- public void cloneTest()
+ public void cloneTest() throws Exception
{
SimpleKafkaConsumer kc = new SimpleKafkaConsumer();
int bufferSize = 1000;
@@ -22,7 +21,7 @@
kc.setTopic("test_topic");
kc.setClientId("test_clientid");
- SimpleKafkaConsumer kcClone = (SimpleKafkaConsumer) kc.cloneConsumer(new HashSet(), new HashMap());
+ SimpleKafkaConsumer kcClone = TestUtils.clone(new Kryo(), kc);
Assert.assertEquals("Buffer size is " + bufferSize, bufferSize, kcClone.getBufferSize());
Assert.assertEquals("Cache size is " + cacheSize, cacheSize, kcClone.getCacheSize());
Assert.assertEquals("Clint id is same", kc.getClientId(), kcClone.getClientId());
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsApplication.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsApplication.java
index 87e2dcb..ad5d45d 100644
--- a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsApplication.java
+++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsApplication.java
@@ -16,11 +16,13 @@
package com.datatorrent.demos.dimensions.generic;
import com.datatorrent.api.Context;
+
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.kafka.KafkaJsonEncoder;
import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator;
import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
@@ -110,6 +112,7 @@
DimensionStoreOperator store = dag.addOperator("Store", DimensionStoreOperator.class);
KafkaSinglePortStringInputOperator queries = dag.addOperator("Query", new KafkaSinglePortStringInputOperator());
KafkaSinglePortOutputOperator<Object, Object> queryResult = dag.addOperator("QueryResult", new KafkaSinglePortOutputOperator<Object, Object>());
+ queryResult.getConfigProperties().put("serializer.class", KafkaJsonEncoder.class.getName());
dag.setInputPortAttribute(converter.input, Context.PortContext.PARTITION_PARALLEL, true);
dag.setInputPortAttribute(dimensions.data, Context.PortContext.PARTITION_PARALLEL, true);
diff --git a/demos/dimensions/src/main/resources/META-INF/properties.xml b/demos/dimensions/src/main/resources/META-INF/properties.xml
index 087e010..41fc5c8 100644
--- a/demos/dimensions/src/main/resources/META-INF/properties.xml
+++ b/demos/dimensions/src/main/resources/META-INF/properties.xml
@@ -70,7 +70,7 @@
<!-- Ensure that Kafka connect address is set locally - deployment specific configuration -->
<!--<property>-->
- <!--<name>dt.operator.Query.brokerSet</name>-->
+ <!--<name>dt.operator.Query.zookeeper</name>-->
<!--<!– <value>hostname:port</value> –>-->
<!--</property>-->
<!--<property>-->
diff --git a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/ads/HDHTApplicationTest.java b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/ads/HDHTApplicationTest.java
index 4881df3..f51c74e 100644
--- a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/ads/HDHTApplicationTest.java
+++ b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/ads/HDHTApplicationTest.java
@@ -49,8 +49,8 @@
FileUtils.deleteDirectory(new File(kafkaLauncher.baseDir));
kafkaLauncher.startZookeeper();
kafkaLauncher.startKafkaServer();
- kafkaLauncher.createTopic(kafkaQueryTopic);
- kafkaLauncher.createTopic(kafkaQueryResultTopic);
+ kafkaLauncher.createTopic(0, kafkaQueryTopic);
+ kafkaLauncher.createTopic(0, kafkaQueryResultTopic);
}
@After
@@ -71,7 +71,7 @@
conf.set("dt.operator.Store.fileStore.basePath", "target/HDSApplicationTestStore");
- conf.set("dt.operator.Query.brokerSet", "localhost:9092");
+ conf.set("dt.operator.Query.zookeeper", "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
conf.set("dt.operator.Query.topic", kafkaQueryTopic);
conf.set("dt.operator.QueryResult.topic", kafkaQueryResultTopic);
diff --git a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/ads/KafkaApplicationTest.java b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/ads/KafkaApplicationTest.java
index b4d2ee9..353b267 100644
--- a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/ads/KafkaApplicationTest.java
+++ b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/ads/KafkaApplicationTest.java
@@ -45,7 +45,7 @@
FileUtils.deleteDirectory(new File(kafkaLauncher.baseDir));
kafkaLauncher.startZookeeper();
kafkaLauncher.startKafkaServer();
- kafkaLauncher.createTopic(kafkaTopic);
+ kafkaLauncher.createTopic(0, kafkaTopic);
}
@After
diff --git a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericAppTest.java b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericAppTest.java
index 41ff992..c5a6ffe 100644
--- a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericAppTest.java
+++ b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericAppTest.java
@@ -50,8 +50,8 @@
FileUtils.deleteDirectory(new File(kafkaLauncher.baseDir));
kafkaLauncher.startZookeeper();
kafkaLauncher.startKafkaServer();
- kafkaLauncher.createTopic(kafkaQueryTopic);
- kafkaLauncher.createTopic(kafkaQueryResultTopic);
+ kafkaLauncher.createTopic(0, kafkaQueryTopic);
+ kafkaLauncher.createTopic(0, kafkaQueryResultTopic);
}
@After
@@ -65,11 +65,12 @@
{
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
- conf.addResource("META-INF/properties.xml");
+ conf.addResource("META-INF/"
+ + ".xml");
conf.set("dt.operator.DimensionsComputation.attr.APPLICATION_WINDOW_COUNT", "1");
conf.set("dt.operator.QueryResult.prop.configProperties(metadata.broker.list)", "localhost:9092");
conf.set("dt.operator.DimensionsStore.fileStore.basePath", "target/HDSApplicationTestStore");
- conf.set("dt.operator.Query.brokerSet", "localhost:9092");
+ conf.set("dt.operator.Query.zookeeper", "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
conf.set("dt.operator.Query.topic", kafkaQueryTopic);
conf.set("dt.operator.QueryResult.topic", kafkaQueryResultTopic);
conf.set("dt.operator.DimensionsComputation.attr.APPLICATION_WINDOW_COUNT", "2");