blob: 40747eb4dd2e9b66681cde9841ac76461340c873 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.kafka;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StatsListener;
import com.datatorrent.netlet.util.DTThrowable;
/**
* The abstract kafka input operator using kafka 0.9.0 new consumer API
* A scalable, fault-tolerant, at-least-once kafka input operator
* Key features includes:
*
* <ol>
* <li>Out-of-box One-to-one and one-to-many partition strategy support plus customizable partition strategy
* refer to AbstractKafkaPartitioner </li>
* <li>Fault-tolerant when the input operator goes down, it redeploys on other node</li>
* <li>At-least-once semantics for operator failure (no matter which operator fails)</li>
* <li>At-least-once semantics for cold restart (no data loss even if you restart the application)</li>
* <li>Multi-cluster support, one operator can consume data from more than one kafka clusters</li>
* <li>Multi-topic support, one operator can subscribe multiple topics</li>
* <li>Throughput control support, you can throttle number of tuple for each streaming window</li>
* </ol>
*
* @since 3.3.0
*/
@InterfaceStability.Evolving
public abstract class AbstractKafkaInputOperator implements InputOperator,
Operator.ActivationListener<Context.OperatorContext>, Operator.CheckpointNotificationListener,
Partitioner<AbstractKafkaInputOperator>, StatsListener, OffsetCommitCallback
{
private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaInputOperator.class);
static {
// We create new consumers periodically to pull metadata (Kafka consumer keeps metadata in cache)
// Skip log4j log for ConsumerConfig class to avoid too much noise in application
LogManager.getLogger(ConsumerConfig.class).setLevel(Level.WARN);
}
public enum InitialOffset
{
EARLIEST, // consume from beginning of the partition every time when application restart
LATEST, // consume from latest of the partition every time when application restart
// consume from committed position from last run or earliest if there is no committed offset(s)
APPLICATION_OR_EARLIEST,
APPLICATION_OR_LATEST // consume from committed position from last run or latest if there is no committed offset(s)
}
@NotNull
private String[] clusters;
@NotNull
private String[] topics;
/**
* offset track for checkpoint
*/
private final Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetTrack = new HashMap<>();
private final transient Map<AbstractKafkaPartitioner.PartitionMeta, Long> windowStartOffset = new HashMap<>();
private transient int operatorId;
private int initialPartitionCount = 1;
private long repartitionInterval = 30000L;
private long repartitionCheckInterval = 5000L;
@Min(1)
private int maxTuplesPerWindow = Integer.MAX_VALUE;
/**
* By default the operator start consuming from the committed offset or the latest one
*/
private InitialOffset initialOffset = InitialOffset.APPLICATION_OR_LATEST;
private long metricsRefreshInterval = 5000L;
private long consumerTimeout = 5000L;
private int holdingBufferSize = 1024;
private Properties consumerProps = new Properties();
/**
* Assignment for each operator instance
*/
private Set<AbstractKafkaPartitioner.PartitionMeta> assignment;
//=======================All transient fields==========================
/**
* Wrapper consumer object
* It wraps KafkaConsumer, maintains consumer thread and store messages in a queue
*/
private final transient KafkaConsumerWrapper consumerWrapper = createConsumerWrapper();
/**
* By default the strategy is one to one
*
* @see PartitionStrategy
*/
private PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE;
/**
* count the emitted message in each window<br>
* non settable
*/
private transient int emitCount = 0;
/**
* store offsets with window id, only keep offsets with windows that have not been committed
*/
private final transient List<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> offsetHistory =
new LinkedList<>();
/**
* Application name is used as group.id for kafka consumer
*/
private transient String applicationName;
private transient AbstractKafkaPartitioner partitioner;
private transient long currentWindowId;
private transient long lastCheckTime = 0L;
private transient long lastRepartitionTime = 0L;
@AutoMetric
private transient KafkaMetrics metrics;
private WindowDataManager windowDataManager = new WindowDataManager.NoopWindowDataManager();
/**
* Creates the Wrapper consumer object
* It maintains consumer thread and store messages in a queue
* @return KafkaConsumerWrapper
*/
public KafkaConsumerWrapper createConsumerWrapper()
{
return new KafkaConsumerWrapper();
}
// Creates the consumer object and it wraps KafkaConsumer.
public abstract AbstractKafkaConsumer createConsumer(Properties prop);
@Override
public void activate(Context.OperatorContext context)
{
consumerWrapper.start(isIdempotent());
}
@Override
public void deactivate()
{
consumerWrapper.stop();
}
@Override
public void checkpointed(long l)
{
}
@Override
public void beforeCheckpoint(long windowId)
{
}
@Override
public void committed(long windowId)
{
if (initialOffset == InitialOffset.LATEST || initialOffset == InitialOffset.EARLIEST) {
return;
}
//ask kafka consumer wrapper to store the committed offsets
for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>> iter =
offsetHistory.iterator(); iter.hasNext(); ) {
Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> item = iter.next();
if (item.getLeft() <= windowId) {
if (item.getLeft() == windowId) {
consumerWrapper.commitOffsets(item.getRight());
}
iter.remove();
}
}
if (isIdempotent()) {
try {
windowDataManager.committed(windowId);
} catch (IOException e) {
DTThrowable.rethrow(e);
}
}
}
@Override
public void emitTuples()
{
int count = consumerWrapper.messageSize();
if (maxTuplesPerWindow > 0) {
count = Math.min(count, maxTuplesPerWindow - emitCount);
}
for (int i = 0; i < count; i++) {
Pair<String, ConsumerRecord<byte[], byte[]>> tuple = consumerWrapper.pollMessage();
ConsumerRecord<byte[], byte[]> msg = tuple.getRight();
emitTuple(tuple.getLeft(), msg);
AbstractKafkaPartitioner.PartitionMeta pm = new AbstractKafkaPartitioner.PartitionMeta(tuple.getLeft(),
msg.topic(), msg.partition());
offsetTrack.put(pm, msg.offset() + 1);
if (isIdempotent() && !windowStartOffset.containsKey(pm)) {
windowStartOffset.put(pm, msg.offset());
}
}
emitCount += count;
}
protected abstract void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> message);
@Override
public void beginWindow(long wid)
{
emitCount = 0;
currentWindowId = wid;
windowStartOffset.clear();
if (isIdempotent() && wid <= windowDataManager.getLargestCompletedWindow()) {
replay(wid);
} else {
consumerWrapper.afterReplay();
}
}
private void replay(long windowId)
{
try {
@SuppressWarnings("unchecked")
Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> windowData =
(Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>>)windowDataManager.retrieve(windowId);
consumerWrapper.emitImmediately(windowData);
} catch (IOException e) {
DTThrowable.rethrow(e);
}
}
@Override
public void endWindow()
{
// copy current offset track to history memory
Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetsWithWindow = new HashMap<>(offsetTrack);
offsetHistory.add(Pair.of(currentWindowId, offsetsWithWindow));
//update metrics
metrics.updateMetrics(clusters, consumerWrapper.getAllConsumerMetrics());
//update the windowDataManager
if (isIdempotent()) {
try {
Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> windowData = new HashMap<>();
for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Long> e : windowStartOffset.entrySet()) {
windowData.put(e.getKey(), new MutablePair<>(e.getValue(), offsetTrack.get(e.getKey()) - e.getValue()));
}
windowDataManager.save(windowData, currentWindowId);
} catch (IOException e) {
DTThrowable.rethrow(e);
}
}
}
@Override
public void setup(Context.OperatorContext context)
{
applicationName = context.getValue(Context.DAGContext.APPLICATION_NAME);
consumerWrapper.create(this);
metrics = new KafkaMetrics(metricsRefreshInterval);
windowDataManager.setup(context);
operatorId = context.getId();
}
@Override
public void teardown()
{
windowDataManager.teardown();
}
private void initPartitioner()
{
if (partitioner == null) {
logger.info("Initialize Partitioner");
switch (strategy) {
case ONE_TO_ONE:
partitioner = new OneToOnePartitioner(clusters, topics, this);
break;
case ONE_TO_MANY:
partitioner = new OneToManyPartitioner(clusters, topics, this);
break;
case ONE_TO_MANY_HEURISTIC:
throw new UnsupportedOperationException("Not implemented yet");
default:
throw new RuntimeException("Invalid strategy");
}
logger.info("Actual Partitioner is {}", partitioner.getClass());
}
}
@Override
public Response processStats(BatchedOperatorStats batchedOperatorStats)
{
long t = System.currentTimeMillis();
if (repartitionInterval < 0 || repartitionCheckInterval < 0 ||
t - lastCheckTime < repartitionCheckInterval || t - lastRepartitionTime < repartitionInterval) {
// return false if it's within repartitionCheckInterval since last time it check the stats
Response response = new Response();
response.repartitionRequired = false;
return response;
}
try {
logger.debug("Process stats");
initPartitioner();
return partitioner.processStats(batchedOperatorStats);
} finally {
lastCheckTime = System.currentTimeMillis();
}
}
@Override
public Collection<Partition<AbstractKafkaInputOperator>> definePartitions(
Collection<Partition<AbstractKafkaInputOperator>> collection, PartitioningContext partitioningContext)
{
logger.debug("Define partitions");
initPartitioner();
return partitioner.definePartitions(collection, partitioningContext);
}
@Override
public void partitioned(Map<Integer, Partition<AbstractKafkaInputOperator>> map)
{
// update the last repartition time
lastRepartitionTime = System.currentTimeMillis();
initPartitioner();
partitioner.partitioned(map);
}
/**
* A callback from consumer after it commits the offset
*
* @param map
* @param e
*/
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e)
{
if (logger.isDebugEnabled()) {
logger.debug("Commit offsets complete {} ", Joiner.on(';').withKeyValueSeparator("=").join(map));
}
if (e != null) {
logger.warn("Exceptions in committing offsets {} : {} ",
Joiner.on(';').withKeyValueSeparator("=").join(map), e);
}
}
public void assign(Set<AbstractKafkaPartitioner.PartitionMeta> assignment)
{
this.assignment = assignment;
}
public Set<AbstractKafkaPartitioner.PartitionMeta> assignment()
{
return assignment;
}
private boolean isIdempotent()
{
return windowDataManager != null && !(windowDataManager instanceof WindowDataManager.NoopWindowDataManager);
}
//---------------------------------------------setters and getters----------------------------------------
public void setInitialPartitionCount(int partitionCount)
{
this.initialPartitionCount = partitionCount;
}
/**
* initial partition count
* only used with PartitionStrategy.ONE_TO_MANY
* or customized strategy
*/
public int getInitialPartitionCount()
{
return initialPartitionCount;
}
public void setClusters(String clusters)
{
this.clusters = clusters.split(";");
}
/**
* Same setting as bootstrap.servers property to KafkaConsumer
* refer to http://kafka.apache.org/documentation.html#newconsumerconfigs
* To support multi cluster, you can have multiple bootstrap.servers separated by ";"
*/
public String getClusters()
{
return Joiner.on(';').join(clusters);
}
public void setTopics(String topics)
{
this.topics = Iterables.toArray(Splitter.on(',').trimResults().omitEmptyStrings().split(topics), String.class);
}
/**
* The topics the operator consumes, separate by','
* Topic name can only contain ASCII alphanumerics, '.', '_' and '-'
*/
public String getTopics()
{
return Joiner.on(", ").join(topics);
}
public void setStrategy(String policy)
{
this.strategy = PartitionStrategy.valueOf(policy.toUpperCase());
}
public String getStrategy()
{
return strategy.name();
}
public void setInitialOffset(String initialOffset)
{
this.initialOffset = InitialOffset.valueOf(initialOffset.toUpperCase());
}
/**
* Initial offset, it should be one of the following
* <ul>
* <li>earliest</li>
* <li>latest</li>
* <li>application_or_earliest</li>
* <li>application_or_latest</li>
* </ul>
*/
public String getInitialOffset()
{
return initialOffset.name();
}
public String getApplicationName()
{
return applicationName;
}
public void setConsumerProps(Properties consumerProps)
{
this.consumerProps = consumerProps;
}
/**
* Extra kafka consumer properties
* http://kafka.apache.org/090/documentation.html#newconsumerconfigs
*
* Please be aware that the properties below are set by the operator, don't override it
*
* <ul>
* <li>bootstrap.servers</li>
* <li>group.id</li>
* <li>auto.offset.reset</li>
* <li>enable.auto.commit</li>
* <li>partition.assignment.strategy</li>
* <li>key.deserializer</li>
* <li>value.deserializer</li>
* </ul>
*/
public Properties getConsumerProps()
{
return consumerProps;
}
public void setMaxTuplesPerWindow(int maxTuplesPerWindow)
{
this.maxTuplesPerWindow = maxTuplesPerWindow;
}
/**
* maximum tuples allowed to be emitted in each window
*/
public int getMaxTuplesPerWindow()
{
return maxTuplesPerWindow;
}
/**
* @see <a href="http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll(long)">
* org.apache.kafka.clients.consumer.KafkaConsumer.poll</a>
*/
public long getConsumerTimeout()
{
return consumerTimeout;
}
public void setConsumerTimeout(long consumerTimeout)
{
this.consumerTimeout = consumerTimeout;
}
/**
* Number of messages kept in memory waiting for emission to downstream operator
*/
public int getHoldingBufferSize()
{
return holdingBufferSize;
}
public void setHoldingBufferSize(int holdingBufferSize)
{
this.holdingBufferSize = holdingBufferSize;
}
/**
* metrics refresh interval
*/
public long getMetricsRefreshInterval()
{
return metricsRefreshInterval;
}
public void setMetricsRefreshInterval(long metricsRefreshInterval)
{
this.metricsRefreshInterval = metricsRefreshInterval;
}
public void setRepartitionCheckInterval(long repartitionCheckInterval)
{
this.repartitionCheckInterval = repartitionCheckInterval;
}
/**
* Minimal interval between checking collected stats and decide whether it needs to repartition or not.
* And minimal interval between 2 offset updates
*/
public long getRepartitionCheckInterval()
{
return repartitionCheckInterval;
}
public void setRepartitionInterval(long repartitionInterval)
{
this.repartitionInterval = repartitionInterval;
}
/**
* Minimal interval between 2 (re)partition actions
*/
public long getRepartitionInterval()
{
return repartitionInterval;
}
public void setWindowDataManager(WindowDataManager windowDataManager)
{
this.windowDataManager = windowDataManager;
}
public WindowDataManager getWindowDataManager()
{
return windowDataManager;
}
/**
* @return current checkpointed offsets
* @omitFromUI
*/
public Map<AbstractKafkaPartitioner.PartitionMeta, Long> getOffsetTrack()
{
return offsetTrack;
}
}