blob: 1218f4aa04e1d1c3545726361cc45ffd04a0061b [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package com.datatorrent.contrib.kafka;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.ActivationListener;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.lib.util.KryoCloneUtils;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.cluster.Broker;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.validation.Valid;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.getOffsetsForPartitions;
* This is a base implementation of a Kafka input operator, which consumes data from Kafka message bus. 
* Subclasses should implement the method for emitting tuples to downstream operators.
* It will be dynamically partitioned based on the upstream kafka partition.
* <p>
* <b>Partition Strategy:</b>
* <p><b>1. ONE_TO_ONE partition</b> Each operator partition will consume from only one kafka partition </p>
* <p><b>2. ONE_TO_MANY partition</b> Each operator partition consumer from multiple kafka partition with some hard ingestion rate limit</p>
* <p><b>3. ONE_TO_MANY_HEURISTIC partition</b>(Not implemented yet) Each operator partition consumer from multiple kafka partition and partition number depends on heuristic function(real time bottle neck)</p>
* <p><b>Note:</b> ONE_TO_MANY partition only support simple kafka consumer because
* <p> 1) high-level consumer can only balance the number of brokers it consumes from rather than the actual load from each broker</p>
* <p> 2) high-level consumer can not reset offset once it's committed so the tuples are not replayable </p>
* <p></p>
* <br>
* <br>
* <b>Basic Algorithm:</b>
* <p>1.Pull the metadata(how many partitions) of the topic from brokerList of {@link KafkaConsumer}</p>
* <p>2.cloneConsumer method is used to initialize the new {@link KafkaConsumer} instance for the new partition operator</p>
* <p>3.cloneOperator method is used to initialize the new {@link AbstractKafkaInputOperator} instance for the new partition operator</p>
* <p>4.ONE_TO_MANY partition use first-fit decreasing algorithm( to minimize the partition operator
* <br>
* <br>
* <b>Load balance:</b> refer to {@link SimpleKafkaConsumer} and {@link HighlevelKafkaConsumer} <br>
* <b>Kafka partition failover:</b> refer to {@link SimpleKafkaConsumer} and {@link HighlevelKafkaConsumer}
* <br>
* <br>
* <b>Self adjust to Kafka partition change:</b>
* <p><b>EACH</b> operator partition periodically check the leader broker(s) change which it consumes from and adjust connection without repartition</p>
* <p><b>ONLY APPMASTER</b> operator periodically check overall kafka partition layout and add operator partition due to kafka partition add(no delete supported by kafka for now)</p>
* <br>
* <br>
* </p>
* Properties:<br>
* <b>tuplesBlast</b>: Number of tuples emitted in each burst<br>
* <b>bufferSize</b>: Size of holding buffer<br>
* <br>
* Compile time checks:<br>
* Class derived from this has to implement the abstract method emitTuple() <br>
* <br>
* Run time checks:<br>
* None<br>
* <br>
* Benchmarks:<br>
* TBD<br>
* <br>
* Each operator can consume 1 topic from multiple partitions and clusters<br>
* </p>
* @displayName Abstract Kafka Input
* @category Messaging
* @tags input operator
* @since 0.3.2
@OperatorAnnotation(partitionable = true)
public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implements InputOperator, ActivationListener<OperatorContext>, Operator.CheckpointNotificationListener, Partitioner<AbstractKafkaInputOperator<K>>, StatsListener
private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaInputOperator.class);
private int maxTuplesPerWindow = Integer.MAX_VALUE;
private long maxTotalMsgSizePerWindow = Long.MAX_VALUE;
private transient int emitCount = 0;
private transient long emitTotalMsgSize = 0;
protected WindowDataManager windowDataManager;
protected transient long currentWindowId;
protected transient int operatorId;
protected final transient Map<KafkaPartition, MutablePair<Long, Integer>> currentWindowRecoveryState;
* Offsets that are checkpointed for recovery
protected Map<KafkaPartition, Long> offsetStats = new HashMap<KafkaPartition, Long>();
* offset history with window id
protected transient List<Pair<Long, Map<KafkaPartition, Long>>> offsetTrackHistory = new LinkedList<>();
private transient OperatorContext context = null;
// By default the partition policy is 1:1
public PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE;
// Deprecated: Please don't use this property.
private long msgRateUpperBound = Long.MAX_VALUE;
// Deprecated: Please don't use this property.
private long byteRateUpperBound = Long.MAX_VALUE;
// Store the current operator partition topology
private transient List<PartitionInfo> currentPartitionInfo = Lists.newLinkedList();
// Store the current collected kafka consumer stats
private transient Map<Integer, List<KafkaConsumer.KafkaMeterStats>> kafkaStatsHolder = new HashMap<Integer, List<KafkaConsumer.KafkaMeterStats>>();
private OffsetManager offsetManager = null;
// Minimal interval between 2 (re)partition actions
private long repartitionInterval = 30000L;
// Minimal interval between checking collected stats and decide whether it needs to repartition or not.
// And minimal interval between 2 offset updates
private long repartitionCheckInterval = 5000L;
private transient long lastCheckTime = 0L;
private transient long lastRepartitionTime = 0L;
// A list store the newly discovered partitions
private transient List<KafkaPartition> newWaitingPartition = new LinkedList<KafkaPartition>();
private transient KafkaConsumer.KafkaMessage pendingMessage;
private int initialPartitionCount = 1;
protected KafkaConsumer consumer = new SimpleKafkaConsumer();
public AbstractKafkaInputOperator()
windowDataManager = new WindowDataManager.NoopWindowDataManager();
currentWindowRecoveryState = new HashMap<KafkaPartition, MutablePair<Long, Integer>>();
* Any concrete class derived from KafkaInputOperator has to implement this method to emit tuples to an output port.
protected abstract void emitTuple(Message message);
* Concrete class derived from KafkaInputOpertor should implement this method if it wants to access kafka offset and partitionId along with kafka message.
protected void emitTuple(KafkaConsumer.KafkaMessage message)
public int getMaxTuplesPerWindow()
return maxTuplesPerWindow;
public void setMaxTuplesPerWindow(int maxTuplesPerWindow)
this.maxTuplesPerWindow = maxTuplesPerWindow;
* Get the maximum total size of messages to be transmitted per window. When the sum of the message sizes transmitted
* in a window reaches this limit no more messages are transmitted till the next window. There is one exception
* however, if the size of the first message in a window is greater than the limit it is still transmitted so that the
* processing of messages doesn't get stuck.
* @return The maximum for the total size
public long getMaxTotalMsgSizePerWindow() {
return maxTotalMsgSizePerWindow;
* Set the maximum total size of messages to be transmitted per window. See {@link #getMaxTotalMsgSizePerWindow()} for
* more description about this property.
* @param maxTotalMsgSizePerWindow The maximum for the total size
public void setMaxTotalMsgSizePerWindow(long maxTotalMsgSizePerWindow) {
this.maxTotalMsgSizePerWindow = maxTotalMsgSizePerWindow;
public void setup(OperatorContext context)
logger.debug("consumer {} topic {} cacheSize {}", consumer, consumer.getTopic(), consumer.getCacheSize());
// reset the offsets to checkpointed one
if (consumer instanceof SimpleKafkaConsumer && !offsetStats.isEmpty()) {
Map<KafkaPartition, Long> currentOffsets = new HashMap<>();
// Increment the offsets and set it to consumer
for (Map.Entry<KafkaPartition, Long> e: offsetStats.entrySet()) {
currentOffsets.put(e.getKey(), e.getValue() + 1);
this.context = context;
operatorId = context.getId();
if(consumer instanceof HighlevelKafkaConsumer && !(windowDataManager instanceof WindowDataManager.NoopWindowDataManager)) {
throw new RuntimeException("Idempotency is not supported for High Level Kafka Consumer");
public void teardown()
public void beginWindow(long windowId)
currentWindowId = windowId;
if (windowId <= windowDataManager.getLargestCompletedWindow()) {
emitCount = 0;
emitTotalMsgSize = 0;
protected void replay(long windowId)
try {
Map<KafkaPartition, MutablePair<Long, Integer>> recoveredData = (Map<KafkaPartition, MutablePair<Long, Integer>>)
if (recoveredData != null) {
Map<String, List<PartitionMetadata>> pms = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, getConsumer().topic);
if (pms != null) {
SimpleKafkaConsumer cons = (SimpleKafkaConsumer) getConsumer();
// add all partition request in one Fretch request together
FetchRequestBuilder frb = new FetchRequestBuilder().clientId(cons.getClientId());
for (Map.Entry<KafkaPartition, MutablePair<Long, Integer>> rc : recoveredData.entrySet()) {
KafkaPartition kp = rc.getKey();
List<PartitionMetadata> pmsVal = pms.get(kp.getClusterId());
Iterator<PartitionMetadata> pmIterator = pmsVal.iterator();
PartitionMetadata pm =;
while (pm.partitionId() != kp.getPartitionId()) {
if (!pmIterator.hasNext())
pm =;
if (pm.partitionId() != kp.getPartitionId())
Broker bk = pm.leader();
frb.addFetch(consumer.topic, rc.getKey().getPartitionId(), rc.getValue().left, cons.getBufferSize());
FetchRequest req =;
SimpleConsumer ksc = new SimpleConsumer(, bk.port(), cons.getTimeout(), cons.getBufferSize(), cons.getClientId());
FetchResponse fetchResponse = ksc.fetch(req);
Integer count = 0;
for (MessageAndOffset msg : fetchResponse.messageSet(consumer.topic, kp.getPartitionId())) {
KafkaConsumer.KafkaMessage kafkaMessage = new KafkaConsumer.KafkaMessage(kp, msg.message(), msg.offset());
offsetStats.put(kp, msg.offset());
count = count + 1;
if (count.equals(rc.getValue().right))
if(windowId == windowDataManager.getLargestCompletedWindow()) {
// Start the consumer at the largest recovery window
SimpleKafkaConsumer cons = (SimpleKafkaConsumer)getConsumer();
// Set the offset positions to the consumer
Map<KafkaPartition, Long> currentOffsets = new HashMap<KafkaPartition, Long>(cons.getCurrentOffsets());
// Increment the offsets
for (Map.Entry<KafkaPartition, Long> e: offsetStats.entrySet()) {
currentOffsets.put(e.getKey(), e.getValue() + 1);
catch (IOException e) {
throw new RuntimeException("replay", e);
public void endWindow()
//TODO depends on APEX-78 only needs to keep the history of windows needs to be commit
if (getConsumer() instanceof SimpleKafkaConsumer) {
Map<KafkaPartition, Long> carryOn = new HashMap<>(offsetStats);
offsetTrackHistory.add(Pair.of(currentWindowId, carryOn));
if (currentWindowId > windowDataManager.getLargestCompletedWindow()) {
try {, currentWindowId);
catch (IOException e) {
throw new RuntimeException("saving recovery", e);
public void checkpointed(long windowId)
// commit the consumer offset
public void beforeCheckpoint(long windowId)
public void committed(long windowId)
if ((getConsumer() instanceof SimpleKafkaConsumer)) {
SimpleKafkaConsumer cons = (SimpleKafkaConsumer)getConsumer();
for (Iterator<Pair<Long, Map<KafkaPartition, Long>>> iter = offsetTrackHistory.iterator(); iter.hasNext(); ) {
Pair<Long, Map<KafkaPartition, Long>> item =;
if (item.getLeft() < windowId) {
} else if (item.getLeft() == windowId) {
if (logger.isDebugEnabled()) {
logger.debug("report offsets {} ", Joiner.on(';').withKeyValueSeparator("=").join(item.getRight()));
try {
catch (IOException e) {
throw new RuntimeException("deleting state", e);
public void activate(OperatorContext ctx)
if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID &&
context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestCompletedWindow()) {
// If it is a replay state, don't start the consumer
// Don't start thread here!
// # of kafka_consumer_threads depends on the type of kafka client and the message
// metadata(topic/partition/replica) layout
public void deactivate()
public void emitTuples()
if (currentWindowId <= windowDataManager.getLargestCompletedWindow()) {
int count = consumer.messageSize() + ((pendingMessage != null) ? 1 : 0);
if (maxTuplesPerWindow > 0) {
count = Math.min(count, maxTuplesPerWindow - emitCount);
KafkaConsumer.KafkaMessage message = null;
for (int i = 0; i < count; i++) {
if (pendingMessage != null) {
message = pendingMessage;
pendingMessage = null;
} else {
message = consumer.pollMessage();
// If the total size transmitted in the window will be exceeded don't transmit anymore messages in this window
// Make an exception for the case when no message has been transmitted in the window and transmit at least one
// message even if the condition is violated so that the processing doesn't get stuck
if ((emitCount > 0) && ((maxTotalMsgSizePerWindow - emitTotalMsgSize) < message.msg.size())) {
pendingMessage = message;
emitTotalMsgSize += message.msg.size();
offsetStats.put(message.kafkaPart, message.offSet);
MutablePair<Long, Integer> offsetAndCount = currentWindowRecoveryState.get(message.kafkaPart);
if(offsetAndCount == null) {
currentWindowRecoveryState.put(message.kafkaPart, new MutablePair<Long, Integer>(message.offSet, 1));
} else {
public void setConsumer(K consumer)
this.consumer = consumer;
public KafkaConsumer getConsumer()
return consumer;
* Set the Topic.
* @omitFromUI
public void setTopic(String topic)
* Set the ZooKeeper quorum of the Kafka cluster(s) you want to consume data from.
* The operator will discover the brokers that it needs to consume messages from.
* @omitFromUI
public void setZookeeper(String zookeeperString)
public void partitioned(Map<Integer, Partitioner.Partition<AbstractKafkaInputOperator<K>>> partitions)
// update the last repartition time
lastRepartitionTime = System.currentTimeMillis();
public Collection<Partitioner.Partition<AbstractKafkaInputOperator<K>>> definePartitions(Collection<Partitioner.Partition<AbstractKafkaInputOperator<K>>> partitions, Partitioner.PartitioningContext context)
// Initialize brokers from zookeepers
boolean isInitialParitition = true;
// check if it's the initial partition
if(partitions.iterator().hasNext()) {
isInitialParitition = partitions.iterator().next().getStats() == null;
// Operator partitions
List<Partitioner.Partition<AbstractKafkaInputOperator<K>>> newPartitions = null;
// initialize the offset
Map<KafkaPartition, Long> initOffset = null;
if(isInitialParitition && offsetManager !=null){
initOffset = offsetManager.loadInitialOffsets();"Initial offsets: {} ", "{ " + Joiner.on(", ").useForNull("").withKeyValueSeparator(": ").join(initOffset) + " }");
Set<Integer> deletedOperators = Sets.newHashSet();
Collection<Partition<AbstractKafkaInputOperator<K>>> resultPartitions = partitions;
boolean numPartitionsChanged = false;
switch (strategy) {
// For the 1 to 1 mapping The framework will create number of operator partitions based on kafka topic partitions
// Each operator partition will consume from only one kafka partition
case ONE_TO_ONE:
if (isInitialParitition) {
lastRepartitionTime = System.currentTimeMillis();"[ONE_TO_ONE]: Initializing partition(s)");
// get partition metadata for topics.
// Whatever operator is using high-level or simple kafka consumer, the operator always create a temporary simple kafka consumer to get the metadata of the topic
// The initial value of brokerList of the KafkaConsumer is used to retrieve the topic metadata
Map<String, List<PartitionMetadata>> kafkaPartitions = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, getConsumer().getTopic());
// initialize the number of operator partitions according to number of kafka partitions
newPartitions = new LinkedList<Partitioner.Partition<AbstractKafkaInputOperator<K>>>();
for (Map.Entry<String, List<PartitionMetadata>> kp : kafkaPartitions.entrySet()) {
String clusterId = kp.getKey();
for (PartitionMetadata pm : kp.getValue()) {"[ONE_TO_ONE]: Create operator partition for cluster {}, topic {}, kafka partition {} ", clusterId, getConsumer().topic, pm.partitionId());
newPartitions.add(createPartition(Sets.newHashSet(new KafkaPartition(clusterId, consumer.topic, pm.partitionId())), initOffset));
resultPartitions = newPartitions;
numPartitionsChanged = true;
else if (newWaitingPartition.size() != 0) {
// add partition for new kafka partition
for (KafkaPartition newPartition : newWaitingPartition) {"[ONE_TO_ONE]: Add operator partition for cluster {}, topic {}, partition {}", newPartition.getClusterId(), getConsumer().topic, newPartition.getPartitionId());
partitions.add(createPartition(Sets.newHashSet(newPartition), null));
resultPartitions = partitions;
numPartitionsChanged = true;
// For the 1 to N mapping The initial partition number is defined by stream application
// Afterwards, the framework will dynamically adjust the partition and allocate consumers to as less operator partitions as it can
// and guarantee the total intake rate for each operator partition is below some threshold
if (getConsumer() instanceof HighlevelKafkaConsumer) {
throw new UnsupportedOperationException("[ONE_TO_MANY]: The high-level consumer is not supported for ONE_TO_MANY partition strategy.");
if (isInitialParitition || newWaitingPartition.size() != 0) {
lastRepartitionTime = System.currentTimeMillis();"[ONE_TO_MANY]: Initializing partition(s)");
// get partition metadata for topics.
// Whatever operator is using high-level or simple kafka consumer, the operator always create a temporary simple kafka consumer to get the metadata of the topic
// The initial value of brokerList of the KafkaConsumer is used to retrieve the topic metadata
Map<String, List<PartitionMetadata>> kafkaPartitions = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, getConsumer().getTopic());
int size = initialPartitionCount;
Set<KafkaPartition>[] kps = (Set<KafkaPartition>[]) Array.newInstance((new HashSet<KafkaPartition>()).getClass(), size);
int i = 0;
for (Map.Entry<String, List<PartitionMetadata>> en : kafkaPartitions.entrySet()) {
String clusterId = en.getKey();
for (PartitionMetadata pm : en.getValue()) {
if (kps[i % size] == null) {
kps[i % size] = new HashSet<KafkaPartition>();
kps[i % size].add(new KafkaPartition(clusterId, consumer.topic, pm.partitionId()));
size = i > size ? size : i;
newPartitions = new ArrayList<Partitioner.Partition<AbstractKafkaInputOperator<K>>>(size);
for (i = 0; i < size; i++) {"[ONE_TO_MANY]: Create operator partition for kafka partition(s): {} ", StringUtils.join(kps[i], ", "));
newPartitions.add(createPartition(kps[i], initOffset));
// Add the existing partition Ids to the deleted operators
for (Partition<AbstractKafkaInputOperator<K>> op : partitions)
resultPartitions = newPartitions;
numPartitionsChanged = true;
throw new UnsupportedOperationException("[ONE_TO_MANY_HEURISTIC]: Not implemented yet");
if (numPartitionsChanged) {
List<WindowDataManager> managers = windowDataManager.partition(resultPartitions.size(), deletedOperators);
int i = 0;
for (Partition<AbstractKafkaInputOperator<K>> partition : resultPartitions) {
return resultPartitions;
* Create a new partition with the partition Ids and initial offset positions
* @deprecated use {@link #createPartition(Set, Map)}
protected Partitioner.Partition<AbstractKafkaInputOperator<K>> createPartition(Set<KafkaPartition> pIds,
Map<KafkaPartition, Long> initOffsets,
@SuppressWarnings("UnusedParameters") Collection<WindowDataManager> newManagers)
return createPartition(pIds, initOffsets);
// Create a new partition with the partition Ids and initial offset positions
protected Partitioner.Partition<AbstractKafkaInputOperator<K>> createPartition(Set<KafkaPartition> pIds,
Map<KafkaPartition, Long> initOffsets)
Partitioner.Partition<AbstractKafkaInputOperator<K>> p = new DefaultPartition<>(KryoCloneUtils.cloneObject(this));
if (p.getPartitionedInstance().getConsumer() instanceof SimpleKafkaConsumer) {
p.getPartitionedInstance().getConsumer().resetPartitionsAndOffset(pIds, initOffsets);
if (initOffsets != null) {
//Don't send all offsets to all partitions
PartitionInfo pif = new PartitionInfo();
pif.kpids = pIds;
return p;
public StatsListener.Response processStats(StatsListener.BatchedOperatorStats stats)
StatsListener.Response resp = new StatsListener.Response();
List<KafkaConsumer.KafkaMeterStats> kstats = extractKafkaStats(stats);
resp.repartitionRequired = isPartitionRequired(stats.getOperatorId(), kstats);
return resp;
private void updateOffsets(List<KafkaConsumer.KafkaMeterStats> kstats)
//In every partition check interval, call offsetmanager to update the offsets
if (offsetManager != null) {
Map<KafkaPartition, Long> offsetsForPartitions = getOffsetsForPartitions(kstats);
if (offsetsForPartitions.size() > 0) {
logger.debug("Passing offset updates to offset manager");
private List<KafkaConsumer.KafkaMeterStats> extractKafkaStats(StatsListener.BatchedOperatorStats stats)
//preprocess the stats
List<KafkaConsumer.KafkaMeterStats> kmsList = new LinkedList<KafkaConsumer.KafkaMeterStats>();
for (Stats.OperatorStats os : stats.getLastWindowedStats()) {
if (os != null && os.counters instanceof KafkaConsumer.KafkaMeterStats) {
kmsList.add((KafkaConsumer.KafkaMeterStats) os.counters);
return kmsList;
* Check whether the operator needs repartition based on reported stats
* @return true if repartition is required
* false if repartition is not required
private boolean isPartitionRequired(int opid, List<KafkaConsumer.KafkaMeterStats> kstats)
long t = System.currentTimeMillis();
// If stats are available then update offsets
// Do this before re-partition interval check below to not miss offset updates
if (kstats.size() > 0) {
logger.debug("Checking offset updates for offset manager");
if (t - lastCheckTime < repartitionCheckInterval) {
// return false if it's within repartitionCheckInterval since last time it check the stats
return false;
if(repartitionInterval < 0){
// if repartition is disabled
return false;
if(t - lastRepartitionTime < repartitionInterval) {
// return false if it's still within repartitionInterval since last (re)partition
return false;
kafkaStatsHolder.put(opid, kstats);
if (kafkaStatsHolder.size() != currentPartitionInfo.size() || currentPartitionInfo.size() == 0) {
// skip checking if the operator hasn't collected all the stats from all the current partitions
return false;
try {
// monitor if new kafka partition added
Set<KafkaPartition> existingIds = new HashSet<KafkaPartition>();
for (PartitionInfo pio : currentPartitionInfo) {
Map<String, List<PartitionMetadata>> partitionsMeta = KafkaMetadataUtil.getPartitionsForTopic(consumer.brokers, consumer.getTopic());
if(partitionsMeta == null){
//broker(s) has temporary issue to get metadata
return false;
for (Map.Entry<String, List<PartitionMetadata>> en : partitionsMeta.entrySet()) {
if(en.getValue() == null){
//broker(s) has temporary issue to get metadata
for (PartitionMetadata pm : en.getValue()) {
KafkaPartition pa = new KafkaPartition(en.getKey(), consumer.topic, pm.partitionId());
if (newWaitingPartition.size() != 0) {
// found new kafka partition
lastRepartitionTime = t;
return true;
return false;
} finally {
// update last check time
lastCheckTime = System.currentTimeMillis();
public static enum PartitionStrategy
* Each operator partition connect to only one kafka partition
* Each operator consumes from several kafka partitions with overall input rate under some certain hard limit in msgs/s or bytes/s
* For now it <b>only</b> support <b>simple kafka consumer</b>
* 1 to N partition based on the heuristic function
* <b>NOT</b> implemented yet
* TODO implement this later
static class PartitionInfo
Set<KafkaPartition> kpids;
long msgRateLeft;
long byteRateLeft;
public WindowDataManager getWindowDataManager()
return windowDataManager;
public void setWindowDataManager(WindowDataManager windowDataManager)
this.windowDataManager = windowDataManager;
public void setInitialPartitionCount(int partitionCount)
this.initialPartitionCount = partitionCount;
public int getInitialPartitionCount()
return initialPartitionCount;
public long getMsgRateUpperBound()
return msgRateUpperBound;
public void setMsgRateUpperBound(long msgRateUpperBound)
this.msgRateUpperBound = msgRateUpperBound;
public long getByteRateUpperBound()
return byteRateUpperBound;
public void setByteRateUpperBound(long byteRateUpperBound)
this.byteRateUpperBound = byteRateUpperBound;
public void setInitialOffset(String initialOffset)
this.consumer.initialOffset = initialOffset;
public void setOffsetManager(OffsetManager offsetManager)
this.offsetManager = offsetManager;
public OffsetManager getOffsetManager()
return offsetManager;
public void setRepartitionCheckInterval(long repartitionCheckInterval)
this.repartitionCheckInterval = repartitionCheckInterval;
public long getRepartitionCheckInterval()
return repartitionCheckInterval;
public void setRepartitionInterval(long repartitionInterval)
this.repartitionInterval = repartitionInterval;
public long getRepartitionInterval()
return repartitionInterval;
public void setStrategy(String policy)
this.strategy = PartitionStrategy.valueOf(policy.toUpperCase());