blob: 5fe1fe1c3c0917e04693691a7b2be08bc89822e2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.kafka;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import com.google.common.base.Joiner;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.datatorrent.netlet.util.DTThrowable;
/**
* This is the wrapper class for new Kafka consumer API
*
* It starts number of consumers(one for each cluster) in same number of threads.
* Maintains the consumer offsets
*
* It also use the consumers to commit the application processed offsets along with the application name
*
*
* @since 3.3.0
*/
@InterfaceStability.Evolving
public class KafkaConsumerWrapper implements Closeable
{
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerWrapper.class);
private AtomicBoolean isAlive = new AtomicBoolean(false);
private final Map<String, AbstractKafkaConsumer> consumers = new HashMap<>();
// The in memory buffer hold consumed messages
private ArrayBlockingQueue<Pair<String, ConsumerRecord<byte[], byte[]>>> holdingBuffer;
private AbstractKafkaInputOperator ownerOperator = null;
private ExecutorService kafkaConsumerExecutor;
private final Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetsToCommit = new HashMap<>();
private boolean waitForReplay = false;
private final List<Future<?>> kafkaConsumerThreads = new ArrayList<>();
public boolean hasAnyKafkaReaderThreadDied()
{
for (Future<?> future : kafkaConsumerThreads) {
if (future.isDone() || future.isCancelled()) {
return true;
}
}
return false;
}
/**
*
* Only put the offset needs to be committed in the ConsumerThread.offsetToCommit map
* The consumer thread will commit the offset(s)
*
* @param offsetsInWindow
*/
public void commitOffsets(Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetsInWindow)
{
if (offsetsInWindow == null) {
return;
}
// group offsets by cluster and topic partition
for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Long> e : offsetsInWindow.entrySet()) {
String cluster = e.getKey().getCluster();
Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsetMap = offsetsToCommit.get(cluster);
if (topicPartitionOffsetMap == null) {
logger.warn("committed offset map should be initialized by consumer thread!");
continue;
}
topicPartitionOffsetMap.put(e.getKey().getTopicPartition(), new OffsetAndMetadata(e.getValue()));
}
}
public void emitImmediately(Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> windowData)
{
for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> windowEntry : windowData.entrySet()) {
AbstractKafkaPartitioner.PartitionMeta meta = windowEntry.getKey();
Pair<Long, Long> replayOffsetSize = windowEntry.getValue();
AbstractKafkaConsumer kc = consumers.get(meta.getCluster());
if (kc == null && kc.isConsumerContainsPartition(windowEntry.getKey().getTopicPartition())) {
throw new RuntimeException("Coundn't find consumer to replay the message PartitionMeta : " + meta);
}
//pause other partition
for (TopicPartition tp : kc.getPartitions()) {
if (meta.getTopicPartition().equals(tp)) {
kc.resumePartition(tp);
} else {
try {
kc.positionPartition(tp);
} catch (NoOffsetForPartitionException e) {
//the poll() method of a consumer will throw exception
// if any of subscribed consumers not initialized with position
handleNoOffsetForPartitionException(e, kc);
}
kc.pausePartition(tp);
}
}
// set the offset to window start offset
kc.seekToOffset(meta.getTopicPartition(), replayOffsetSize.getLeft());
long windowCount = replayOffsetSize.getRight();
while (windowCount > 0) {
try {
ConsumerRecords<byte[], byte[]> records = kc.pollRecords(ownerOperator.getConsumerTimeout());
for (Iterator<ConsumerRecord<byte[], byte[]>> cri = records.iterator(); cri.hasNext() && windowCount > 0; ) {
ownerOperator.emitTuple(meta.getCluster(), cri.next());
windowCount--;
}
} catch (NoOffsetForPartitionException e) {
throw new RuntimeException("Couldn't replay the offset", e);
}
}
// set the offset after window
kc.seekToOffset(meta.getTopicPartition(), replayOffsetSize.getLeft() + replayOffsetSize.getRight());
}
// resume all topics
for (AbstractKafkaConsumer kc : consumers.values()) {
kc.resumeAllPartitions();
}
}
public void afterReplay()
{
waitForReplay = false;
}
static final class ConsumerThread implements Runnable
{
private final AbstractKafkaConsumer consumer;
private final String cluster;
private final KafkaConsumerWrapper wrapper;
private Map<TopicPartition, OffsetAndMetadata> offsetToCommit = null;
public ConsumerThread(String cluster, AbstractKafkaConsumer consumer, KafkaConsumerWrapper wrapper)
{
this.cluster = cluster;
this.consumer = consumer;
this.wrapper = wrapper;
this.offsetToCommit = new ConcurrentHashMap<>();
wrapper.offsetsToCommit.put(cluster, offsetToCommit);
}
@Override
public void run()
{
try {
while (wrapper.isAlive.get()) {
if (wrapper.waitForReplay) {
Thread.sleep(100);
continue;
}
if (!this.offsetToCommit.isEmpty()) {
// in each fetch cycle commit the offset if needed
if (logger.isDebugEnabled()) {
logger.debug("Commit offsets {}", Joiner.on(';').withKeyValueSeparator("=").join(this.offsetToCommit));
}
consumer.commitAsync(offsetToCommit, wrapper.ownerOperator);
offsetToCommit.clear();
}
try {
ConsumerRecords<byte[], byte[]> records = consumer.pollRecords(wrapper.ownerOperator.getConsumerTimeout());
for (ConsumerRecord<byte[], byte[]> record : records) {
wrapper.putMessage(Pair.of(cluster, record));
}
} catch (NoOffsetForPartitionException e) {
wrapper.handleNoOffsetForPartitionException(e, consumer);
} catch (InterruptedException e) {
throw new IllegalStateException("Consumer thread is interrupted unexpectedly", e);
}
}
} catch (WakeupException we) {
logger.error("The consumer is being stopped");
} catch (InterruptedException e) {
logger.error("Kafka consumer thread was interrupted. {}", e);
DTThrowable.rethrow(e);
} catch (Throwable ex) {
logger.error("Kafka consumer wrapper thread failed with the exception {}", ex);
} finally {
logger.error("Exiting Kafka consumer thread.");
consumer.close();
}
}
}
protected void handleNoOffsetForPartitionException(NoOffsetForPartitionException e,
AbstractKafkaConsumer consumer)
{
// if initialOffset is set to EARLIST or LATEST
// and the application is run as first time
// then there is no existing committed offset and this error will be caught
// we need to seek to either beginning or end of the partition
// based on the initial offset setting
AbstractKafkaInputOperator.InitialOffset io =
AbstractKafkaInputOperator.InitialOffset.valueOf(ownerOperator.getInitialOffset());
if (io == AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_EARLIEST
|| io == AbstractKafkaInputOperator.InitialOffset.EARLIEST) {
consumer.seekToBeginning(e.partitions().toArray(new TopicPartition[0]));
} else {
consumer.seekToEnd(e.partitions().toArray(new TopicPartition[0]));
}
}
/**
* This method is called in setup method of Abstract Kafka Input Operator
*/
public void create(AbstractKafkaInputOperator ownerOperator)
{
holdingBuffer = new ArrayBlockingQueue<>(ownerOperator.getHoldingBufferSize());
this.ownerOperator = ownerOperator;
logger.info("Create consumer wrapper with holding buffer size: {} ", ownerOperator.getHoldingBufferSize());
if (logger.isInfoEnabled()) {
logger.info("Assignments are {} ", Joiner.on('\n').join(ownerOperator.assignment()));
}
}
/**
* This method is called in the activate method of the operator
*/
public void start(boolean waitForReplay)
{
this.waitForReplay = waitForReplay;
isAlive.set(true);
// thread to consume the kafka data
// create thread pool for consumer threads
kafkaConsumerExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("kafka-consumer-%d").build());
// group list of PartitionMeta by cluster
Map<String, List<TopicPartition>> consumerAssignment = new HashMap<>();
Set<AbstractKafkaPartitioner.PartitionMeta> assignments = ownerOperator.assignment();
for (AbstractKafkaPartitioner.PartitionMeta partitionMeta : assignments) {
String cluster = partitionMeta.getCluster();
List<TopicPartition> cAssignment = consumerAssignment.get(cluster);
if (cAssignment == null) {
cAssignment = new LinkedList<>();
consumerAssignment.put(cluster, cAssignment);
}
cAssignment.add(new TopicPartition(partitionMeta.getTopic(), partitionMeta.getPartitionId()));
}
Map<AbstractKafkaPartitioner.PartitionMeta, Long> currentOffset = ownerOperator.getOffsetTrack();
// create one thread for each cluster
// each thread use one KafkaConsumer to consume from 1+ partition(s) of 1+ topic(s)
for (Map.Entry<String, List<TopicPartition>> e : consumerAssignment.entrySet()) {
Properties prop = new Properties();
if (ownerOperator.getConsumerProps() != null) {
prop.putAll(ownerOperator.getConsumerProps());
}
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, e.getKey());
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
// never auto commit the offsets
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
AbstractKafkaInputOperator.InitialOffset initialOffset =
AbstractKafkaInputOperator.InitialOffset.valueOf(ownerOperator.getInitialOffset());
if (initialOffset == AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_EARLIEST ||
initialOffset == AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_LATEST) {
// commit the offset with application name if we set initialoffset to application
prop.put(ConsumerConfig.GROUP_ID_CONFIG, ownerOperator.getApplicationName() + "_Consumer");
}
AbstractKafkaConsumer kc = ownerOperator.createConsumer(prop);
kc.assignPartitions(e.getValue());
if (logger.isInfoEnabled()) {
logger.info("Create consumer with properties {} ", Joiner.on(";").withKeyValueSeparator("=").join(prop));
logger.info("Assign consumer to {}", Joiner.on('#').join(e.getValue()));
}
if (currentOffset != null && !currentOffset.isEmpty()) {
for (TopicPartition tp : e.getValue()) {
AbstractKafkaPartitioner.PartitionMeta partitionKey =
new AbstractKafkaPartitioner.PartitionMeta(e.getKey(), tp.topic(), tp.partition());
if (currentOffset.containsKey(partitionKey)) {
kc.seekToOffset(tp, currentOffset.get(partitionKey));
}
}
}
consumers.put(e.getKey(), kc);
Future<?> future = kafkaConsumerExecutor.submit(new ConsumerThread(e.getKey(), kc, this));
kafkaConsumerThreads.add(future);
}
}
/**
* The method is called in the deactivate method of the operator
*/
public void stop()
{
isAlive.set(false);
for (AbstractKafkaConsumer c : consumers.values()) {
c.wakeup();
}
kafkaConsumerExecutor.shutdownNow();
holdingBuffer.clear();
IOUtils.closeQuietly(this);
}
/**
* This method is called in teardown method of the operator
*/
public void teardown()
{
holdingBuffer.clear();
}
public Pair<String, ConsumerRecord<byte[], byte[]>> pollMessage()
{
return holdingBuffer.poll();
}
public int messageSize()
{
return holdingBuffer.size();
}
protected final void putMessage(Pair<String, ConsumerRecord<byte[], byte[]>> msg) throws InterruptedException
{
// block from receiving more message
holdingBuffer.put(msg);
}
@Override
public void close() throws IOException
{
}
public Map<String, Map<MetricName, ? extends Metric>> getAllConsumerMetrics()
{
Map<String, Map<MetricName, ? extends Metric>> val = new HashMap<>();
for (Map.Entry<String, AbstractKafkaConsumer> e : consumers.entrySet()) {
val.put(e.getKey(), e.getValue().metrics());
}
return val;
}
}