blob: 40747eb4dd2e9b66681cde9841ac76461340c873 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.apex.malhar.kafka;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StatsListener;
import com.datatorrent.netlet.util.DTThrowable;
* The abstract kafka input operator using kafka 0.9.0 new consumer API
* A scalable, fault-tolerant, at-least-once kafka input operator
* Key features includes:
* <ol>
* <li>Out-of-box One-to-one and one-to-many partition strategy support plus customizable partition strategy
* refer to AbstractKafkaPartitioner </li>
* <li>Fault-tolerant when the input operator goes down, it redeploys on other node</li>
* <li>At-least-once semantics for operator failure (no matter which operator fails)</li>
* <li>At-least-once semantics for cold restart (no data loss even if you restart the application)</li>
* <li>Multi-cluster support, one operator can consume data from more than one kafka clusters</li>
* <li>Multi-topic support, one operator can subscribe multiple topics</li>
* <li>Throughput control support, you can throttle number of tuple for each streaming window</li>
* </ol>
* @since 3.3.0
public abstract class AbstractKafkaInputOperator implements InputOperator,
Operator.ActivationListener<Context.OperatorContext>, Operator.CheckpointNotificationListener,
Partitioner<AbstractKafkaInputOperator>, StatsListener, OffsetCommitCallback
private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaInputOperator.class);
static {
// We create new consumers periodically to pull metadata (Kafka consumer keeps metadata in cache)
// Skip log4j log for ConsumerConfig class to avoid too much noise in application
public enum InitialOffset
EARLIEST, // consume from beginning of the partition every time when application restart
LATEST, // consume from latest of the partition every time when application restart
// consume from committed position from last run or earliest if there is no committed offset(s)
APPLICATION_OR_LATEST // consume from committed position from last run or latest if there is no committed offset(s)
private String[] clusters;
private String[] topics;
* offset track for checkpoint
private final Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetTrack = new HashMap<>();
private final transient Map<AbstractKafkaPartitioner.PartitionMeta, Long> windowStartOffset = new HashMap<>();
private transient int operatorId;
private int initialPartitionCount = 1;
private long repartitionInterval = 30000L;
private long repartitionCheckInterval = 5000L;
private int maxTuplesPerWindow = Integer.MAX_VALUE;
* By default the operator start consuming from the committed offset or the latest one
private InitialOffset initialOffset = InitialOffset.APPLICATION_OR_LATEST;
private long metricsRefreshInterval = 5000L;
private long consumerTimeout = 5000L;
private int holdingBufferSize = 1024;
private Properties consumerProps = new Properties();
* Assignment for each operator instance
private Set<AbstractKafkaPartitioner.PartitionMeta> assignment;
//=======================All transient fields==========================
* Wrapper consumer object
* It wraps KafkaConsumer, maintains consumer thread and store messages in a queue
private final transient KafkaConsumerWrapper consumerWrapper = createConsumerWrapper();
* By default the strategy is one to one
* @see PartitionStrategy
private PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE;
* count the emitted message in each window<br>
* non settable
private transient int emitCount = 0;
* store offsets with window id, only keep offsets with windows that have not been committed
private final transient List<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> offsetHistory =
new LinkedList<>();
* Application name is used as for kafka consumer
private transient String applicationName;
private transient AbstractKafkaPartitioner partitioner;
private transient long currentWindowId;
private transient long lastCheckTime = 0L;
private transient long lastRepartitionTime = 0L;
private transient KafkaMetrics metrics;
private WindowDataManager windowDataManager = new WindowDataManager.NoopWindowDataManager();
* Creates the Wrapper consumer object
* It maintains consumer thread and store messages in a queue
* @return KafkaConsumerWrapper
public KafkaConsumerWrapper createConsumerWrapper()
return new KafkaConsumerWrapper();
// Creates the consumer object and it wraps KafkaConsumer.
public abstract AbstractKafkaConsumer createConsumer(Properties prop);
public void activate(Context.OperatorContext context)
public void deactivate()
public void checkpointed(long l)
public void beforeCheckpoint(long windowId)
public void committed(long windowId)
if (initialOffset == InitialOffset.LATEST || initialOffset == InitialOffset.EARLIEST) {
//ask kafka consumer wrapper to store the committed offsets
for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> iter =
offsetHistory.iterator(); iter.hasNext(); ) {
Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> item =;
if (item.getLeft() <= windowId) {
if (item.getLeft() == windowId) {
if (isIdempotent()) {
try {
} catch (IOException e) {
public void emitTuples()
int count = consumerWrapper.messageSize();
if (maxTuplesPerWindow > 0) {
count = Math.min(count, maxTuplesPerWindow - emitCount);
for (int i = 0; i < count; i++) {
Pair<String, ConsumerRecord<byte[], byte[]>> tuple = consumerWrapper.pollMessage();
ConsumerRecord<byte[], byte[]> msg = tuple.getRight();
emitTuple(tuple.getLeft(), msg);
AbstractKafkaPartitioner.PartitionMeta pm = new AbstractKafkaPartitioner.PartitionMeta(tuple.getLeft(),
msg.topic(), msg.partition());
offsetTrack.put(pm, msg.offset() + 1);
if (isIdempotent() && !windowStartOffset.containsKey(pm)) {
windowStartOffset.put(pm, msg.offset());
emitCount += count;
protected abstract void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> message);
public void beginWindow(long wid)
emitCount = 0;
currentWindowId = wid;
if (isIdempotent() && wid <= windowDataManager.getLargestCompletedWindow()) {
} else {
private void replay(long windowId)
try {
Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> windowData =
(Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>>)windowDataManager.retrieve(windowId);
} catch (IOException e) {
public void endWindow()
// copy current offset track to history memory
Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetsWithWindow = new HashMap<>(offsetTrack);
offsetHistory.add(Pair.of(currentWindowId, offsetsWithWindow));
//update metrics
metrics.updateMetrics(clusters, consumerWrapper.getAllConsumerMetrics());
//update the windowDataManager
if (isIdempotent()) {
try {
Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> windowData = new HashMap<>();
for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Long> e : windowStartOffset.entrySet()) {
windowData.put(e.getKey(), new MutablePair<>(e.getValue(), offsetTrack.get(e.getKey()) - e.getValue()));
}, currentWindowId);
} catch (IOException e) {
public void setup(Context.OperatorContext context)
applicationName = context.getValue(Context.DAGContext.APPLICATION_NAME);
metrics = new KafkaMetrics(metricsRefreshInterval);
operatorId = context.getId();
public void teardown()
private void initPartitioner()
if (partitioner == null) {"Initialize Partitioner");
switch (strategy) {
case ONE_TO_ONE:
partitioner = new OneToOnePartitioner(clusters, topics, this);
partitioner = new OneToManyPartitioner(clusters, topics, this);
throw new UnsupportedOperationException("Not implemented yet");
throw new RuntimeException("Invalid strategy");
}"Actual Partitioner is {}", partitioner.getClass());
public Response processStats(BatchedOperatorStats batchedOperatorStats)
long t = System.currentTimeMillis();
if (repartitionInterval < 0 || repartitionCheckInterval < 0 ||
t - lastCheckTime < repartitionCheckInterval || t - lastRepartitionTime < repartitionInterval) {
// return false if it's within repartitionCheckInterval since last time it check the stats
Response response = new Response();
response.repartitionRequired = false;
return response;
try {
logger.debug("Process stats");
return partitioner.processStats(batchedOperatorStats);
} finally {
lastCheckTime = System.currentTimeMillis();
public Collection<Partition<AbstractKafkaInputOperator>> definePartitions(
Collection<Partition<AbstractKafkaInputOperator>> collection, PartitioningContext partitioningContext)
logger.debug("Define partitions");
return partitioner.definePartitions(collection, partitioningContext);
public void partitioned(Map<Integer, Partition<AbstractKafkaInputOperator>> map)
// update the last repartition time
lastRepartitionTime = System.currentTimeMillis();
* A callback from consumer after it commits the offset
* @param map
* @param e
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e)
if (logger.isDebugEnabled()) {
logger.debug("Commit offsets complete {} ", Joiner.on(';').withKeyValueSeparator("=").join(map));
if (e != null) {
logger.warn("Exceptions in committing offsets {} : {} ",
Joiner.on(';').withKeyValueSeparator("=").join(map), e);
public void assign(Set<AbstractKafkaPartitioner.PartitionMeta> assignment)
this.assignment = assignment;
public Set<AbstractKafkaPartitioner.PartitionMeta> assignment()
return assignment;
private boolean isIdempotent()
return windowDataManager != null && !(windowDataManager instanceof WindowDataManager.NoopWindowDataManager);
//---------------------------------------------setters and getters----------------------------------------
public void setInitialPartitionCount(int partitionCount)
this.initialPartitionCount = partitionCount;
* initial partition count
* only used with PartitionStrategy.ONE_TO_MANY
* or customized strategy
public int getInitialPartitionCount()
return initialPartitionCount;
public void setClusters(String clusters)
this.clusters = clusters.split(";");
* Same setting as bootstrap.servers property to KafkaConsumer
* refer to
* To support multi cluster, you can have multiple bootstrap.servers separated by ";"
public String getClusters()
return Joiner.on(';').join(clusters);
public void setTopics(String topics)
this.topics = Iterables.toArray(Splitter.on(',').trimResults().omitEmptyStrings().split(topics), String.class);
* The topics the operator consumes, separate by','
* Topic name can only contain ASCII alphanumerics, '.', '_' and '-'
public String getTopics()
return Joiner.on(", ").join(topics);
public void setStrategy(String policy)
this.strategy = PartitionStrategy.valueOf(policy.toUpperCase());
public String getStrategy()
public void setInitialOffset(String initialOffset)
this.initialOffset = InitialOffset.valueOf(initialOffset.toUpperCase());
* Initial offset, it should be one of the following
* <ul>
* <li>earliest</li>
* <li>latest</li>
* <li>application_or_earliest</li>
* <li>application_or_latest</li>
* </ul>
public String getInitialOffset()
public String getApplicationName()
return applicationName;
public void setConsumerProps(Properties consumerProps)
this.consumerProps = consumerProps;
* Extra kafka consumer properties
* Please be aware that the properties below are set by the operator, don't override it
* <ul>
* <li>bootstrap.servers</li>
* <li></li>
* <li>auto.offset.reset</li>
* <li></li>
* <li>partition.assignment.strategy</li>
* <li>key.deserializer</li>
* <li>value.deserializer</li>
* </ul>
public Properties getConsumerProps()
return consumerProps;
public void setMaxTuplesPerWindow(int maxTuplesPerWindow)
this.maxTuplesPerWindow = maxTuplesPerWindow;
* maximum tuples allowed to be emitted in each window
public int getMaxTuplesPerWindow()
return maxTuplesPerWindow;
* @see <a href="">
* org.apache.kafka.clients.consumer.KafkaConsumer.poll</a>
public long getConsumerTimeout()
return consumerTimeout;
public void setConsumerTimeout(long consumerTimeout)
this.consumerTimeout = consumerTimeout;
* Number of messages kept in memory waiting for emission to downstream operator
public int getHoldingBufferSize()
return holdingBufferSize;
public void setHoldingBufferSize(int holdingBufferSize)
this.holdingBufferSize = holdingBufferSize;
* metrics refresh interval
public long getMetricsRefreshInterval()
return metricsRefreshInterval;
public void setMetricsRefreshInterval(long metricsRefreshInterval)
this.metricsRefreshInterval = metricsRefreshInterval;
public void setRepartitionCheckInterval(long repartitionCheckInterval)
this.repartitionCheckInterval = repartitionCheckInterval;
* Minimal interval between checking collected stats and decide whether it needs to repartition or not.
* And minimal interval between 2 offset updates
public long getRepartitionCheckInterval()
return repartitionCheckInterval;
public void setRepartitionInterval(long repartitionInterval)
this.repartitionInterval = repartitionInterval;
* Minimal interval between 2 (re)partition actions
public long getRepartitionInterval()
return repartitionInterval;
public void setWindowDataManager(WindowDataManager windowDataManager)
this.windowDataManager = windowDataManager;
public WindowDataManager getWindowDataManager()
return windowDataManager;
* @return current checkpointed offsets
* @omitFromUI
public Map<AbstractKafkaPartitioner.PartitionMeta, Long> getOffsetTrack()
return offsetTrack;