blob: 215c84cb7d2f959c7b937c2caa503f483eb10cb5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.heron.spouts.kafka;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Queue;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.heron.api.Config;
import org.apache.heron.api.spout.BaseRichSpout;
import org.apache.heron.api.spout.SpoutOutputCollector;
import org.apache.heron.api.state.State;
import org.apache.heron.api.topology.IStatefulComponent;
import org.apache.heron.api.topology.OutputFieldsDeclarer;
import org.apache.heron.api.topology.TopologyContext;
import org.apache.heron.api.tuple.Fields;
import org.apache.heron.common.basics.SingletonRegistry;
import org.apache.heron.common.config.SystemConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
/**
* Kafka spout to consume data from Kafka topic(s), each record is converted into a tuple via {@link ConsumerRecordTransformer}, and emitted into a topology
*
* @param <K> the type of the key field of the Kafka record
* @param <V> the type of the value field of the Kafka record
*/
public class KafkaSpout<K, V> extends BaseRichSpout
implements IStatefulComponent<TopicPartition, Long> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
private static final long serialVersionUID = -2271355516537883361L;
private int metricsIntervalInSecs = 60;
private KafkaConsumerFactory<K, V> kafkaConsumerFactory;
private TopicPatternProvider topicPatternProvider;
private Collection<String> topicNames;
private ConsumerRecordTransformer<K, V> consumerRecordTransformer =
new DefaultConsumerRecordTransformer<>();
private transient SpoutOutputCollector collector;
private transient TopologyContext topologyContext;
private transient Queue<ConsumerRecord<K, V>> buffer;
private transient Consumer<K, V> consumer;
private transient Set<MetricName> reportedMetrics;
private transient Set<TopicPartition> assignedPartitions;
private transient Map<TopicPartition, NavigableMap<Long, Long>> ackRegistry;
private transient Map<TopicPartition, Long> failureRegistry;
private Config.TopologyReliabilityMode topologyReliabilityMode =
Config.TopologyReliabilityMode.ATMOST_ONCE;
private long previousKafkaMetricsUpdatedTimestamp = 0;
private State<TopicPartition, Long> state;
/**
* create a KafkaSpout instance that subscribes to a list of topics
*
* @param kafkaConsumerFactory kafka consumer factory
* @param topicNames list of topic names
*/
public KafkaSpout(KafkaConsumerFactory<K, V> kafkaConsumerFactory,
Collection<String> topicNames) {
this.kafkaConsumerFactory = kafkaConsumerFactory;
this.topicNames = topicNames;
}
/**
* create a KafkaSpout instance that subscribe to all topics matching the topic pattern
*
* @param kafkaConsumerFactory kafka consumer factory
* @param topicPatternProvider provider of the topic matching pattern
*/
public KafkaSpout(KafkaConsumerFactory<K, V> kafkaConsumerFactory,
TopicPatternProvider topicPatternProvider) {
this.kafkaConsumerFactory = kafkaConsumerFactory;
this.topicPatternProvider = topicPatternProvider;
}
/**
* return the consumer record transformer
*
* @return the Kafka record transformer instance used by this Kafka Spout
*/
@SuppressWarnings("WeakerAccess")
public ConsumerRecordTransformer<K, V> getConsumerRecordTransformer() {
return consumerRecordTransformer;
}
/**
* set the Kafka record transformer
*
* @param consumerRecordTransformer kafka record transformer
*/
@SuppressWarnings("WeakerAccess")
public void setConsumerRecordTransformer(ConsumerRecordTransformer<K, V>
consumerRecordTransformer) {
this.consumerRecordTransformer = consumerRecordTransformer;
}
@Override
public void open(Map<String, Object> conf, TopologyContext context,
SpoutOutputCollector aCollector) {
this.collector = aCollector;
this.topologyContext = context;
initialize(conf);
}
@Override
public void initState(State<TopicPartition, Long> aState) {
this.state = aState;
LOG.info("initial state {}", aState);
}
@Override
public void preSave(String checkpointId) {
LOG.info("save state {}", state);
consumer.commitAsync(state.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey,
entry -> new OffsetAndMetadata(entry.getValue() + 1))),
null);
}
@Override
public void nextTuple() {
ConsumerRecord<K, V> record = buffer.poll();
if (record != null) {
// there are still records remaining for emission from the previous poll
emitConsumerRecord(record);
} else {
//all the records from previous poll have been
//emitted or this is very first time to poll
if (topologyReliabilityMode == Config.TopologyReliabilityMode.ATLEAST_ONCE) {
ackRegistry.forEach((key, value) -> {
if (value != null) {
//seek back to the earliest failed offset if there is any
rewindAndDiscardAck(key, value);
//commit based on the first continuous acknowledgement range
manualCommit(key, value);
}
});
}
poll().forEach(kvConsumerRecord -> buffer.offer(kvConsumerRecord));
}
}
@Override
public void activate() {
if (!assignedPartitions.isEmpty()) {
consumer.resume(assignedPartitions);
}
}
@Override
public void deactivate() {
if (!assignedPartitions.isEmpty()) {
consumer.pause(assignedPartitions);
}
}
@SuppressWarnings("Duplicates")
@Override
public void ack(Object msgId) {
long start = System.nanoTime();
ConsumerRecordMessageId consumerRecordMessageId = (ConsumerRecordMessageId) msgId;
TopicPartition topicPartition = consumerRecordMessageId.getTopicPartition();
if (!assignedPartitions.contains(topicPartition)) {
LOG.info("ignore {} because it's been revoked", consumerRecordMessageId);
return;
}
long offset = consumerRecordMessageId.getOffset();
ackRegistry.putIfAbsent(topicPartition, new TreeMap<>());
NavigableMap<Long, Long> navigableMap = ackRegistry.get(topicPartition);
Map.Entry<Long, Long> floorRange = navigableMap.floorEntry(offset);
Map.Entry<Long, Long> ceilingRange = navigableMap.ceilingEntry(offset);
long floorBottom = floorRange != null ? floorRange.getKey() : Long.MIN_VALUE;
long floorTop = floorRange != null ? floorRange.getValue() : Long.MIN_VALUE;
long ceilingBottom = ceilingRange != null ? ceilingRange.getKey() : Long.MAX_VALUE;
long ceilingTop = ceilingRange != null ? ceilingRange.getValue() : Long.MAX_VALUE;
//the ack is for a message that has already been acknowledged.
//This happens when a failed tuple has caused
//Kafka consumer to seek back to earlier position, and some messages are replayed.
if ((offset >= floorBottom && offset <= floorTop)
|| (offset >= ceilingBottom && offset <= ceilingTop)) {
return;
}
if (ceilingBottom - floorTop == 2) {
//the ack connects the two adjacent range
navigableMap.put(floorBottom, ceilingTop);
navigableMap.remove(ceilingBottom);
} else if (offset == floorTop + 1) {
//the acknowledged offset is the immediate neighbour
// of the upper bound of the floor range
navigableMap.put(floorBottom, offset);
} else if (offset == ceilingBottom - 1) {
//the acknowledged offset is the immediate neighbour
// of the lower bound of the ceiling range
navigableMap.remove(ceilingBottom);
navigableMap.put(offset, ceilingTop);
} else {
//it is a new born range
navigableMap.put(offset, offset);
}
LOG.debug("ack {} in {} ns", msgId, System.nanoTime() - start);
LOG.debug("{}", ackRegistry.get(consumerRecordMessageId.getTopicPartition()));
}
@Override
public void fail(Object msgId) {
ConsumerRecordMessageId consumerRecordMessageId = (ConsumerRecordMessageId) msgId;
TopicPartition topicPartition = consumerRecordMessageId.getTopicPartition();
if (!assignedPartitions.contains(topicPartition)) {
LOG.info("ignore {} because it's been revoked", consumerRecordMessageId);
return;
}
long offset = consumerRecordMessageId.getOffset();
failureRegistry.put(topicPartition,
Math.min(failureRegistry.getOrDefault(topicPartition,
Long.MAX_VALUE), offset));
LOG.warn("fail {}", msgId);
}
@Override
public void close() {
consumer.close();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
consumerRecordTransformer.getOutputStreams()
.forEach(s -> declarer.declareStream(s,
new Fields(consumerRecordTransformer.getFieldNames(s))));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
private void initialize(Map<String, Object> conf) {
topologyReliabilityMode = Config.TopologyReliabilityMode.valueOf(
conf.get(Config.TOPOLOGY_RELIABILITY_MODE).toString());
metricsIntervalInSecs = (int) ((SystemConfig) SingletonRegistry.INSTANCE
.getSingleton(SystemConfig.HERON_SYSTEM_CONFIG))
.getHeronMetricsExportInterval().getSeconds();
consumer = kafkaConsumerFactory.create();
if (topicNames != null) {
consumer.subscribe(topicNames, new KafkaConsumerRebalanceListener());
} else {
consumer.subscribe(topicPatternProvider.create(), new KafkaConsumerRebalanceListener());
}
buffer = new ArrayDeque<>(500);
ackRegistry = new HashMap<>();
failureRegistry = new HashMap<>();
assignedPartitions = new HashSet<>();
reportedMetrics = new HashSet<>();
}
private void emitConsumerRecord(ConsumerRecord<K, V> record) {
Map<String, List<Object>> tupleByStream = consumerRecordTransformer.transform(record);
//nothing worth emitting out of this record,
//so immediately acknowledge it if in ATLEAST_ONCE mode
if (tupleByStream.isEmpty() && topologyReliabilityMode
== Config.TopologyReliabilityMode.ATLEAST_ONCE) {
ack(new ConsumerRecordMessageId(new TopicPartition(record.topic(), record.partition()),
record.offset()));
return;
}
tupleByStream.forEach((s, objects) -> {
switch (topologyReliabilityMode) {
case ATMOST_ONCE:
collector.emit(s, objects);
break;
case ATLEAST_ONCE:
//build message id based on topic, partition, offset of the consumer record
ConsumerRecordMessageId consumerRecordMessageId =
new ConsumerRecordMessageId(new TopicPartition(record.topic(),
record.partition()), record.offset());
//emit tuple with the message id
collector.emit(s, objects, consumerRecordMessageId);
break;
case EFFECTIVELY_ONCE:
collector.emit(s, objects);
//only in effective once mode, we need to track the offset of the record //that is just
//emitted into the topology
state.put(new TopicPartition(record.topic(), record.partition()),
record.offset());
break;
default:
LOG.warn("unsupported reliability mode {}", topologyReliabilityMode);
}
});
}
private void rewindAndDiscardAck(TopicPartition topicPartition,
NavigableMap<Long, Long> ackRanges) {
if (failureRegistry.containsKey(topicPartition)) {
long earliestFailedOffset = failureRegistry.get(topicPartition);
//rewind back to the earliest failed offset
consumer.seek(topicPartition, earliestFailedOffset);
//discard the ack whose offset is greater than the earliest failed offset
//if there
//is any because we've rewound the consumer back
SortedMap<Long, Long> sortedMap = ackRanges.headMap(earliestFailedOffset);
if (!sortedMap.isEmpty()) {
sortedMap.put(sortedMap.lastKey(),
Math.min(earliestFailedOffset,
sortedMap.get(sortedMap.lastKey())));
}
ackRegistry.put(topicPartition, new TreeMap<>(sortedMap));
//failure for this partition has been dealt with
failureRegistry.remove(topicPartition);
}
}
private void manualCommit(TopicPartition topicPartition, NavigableMap<Long, Long> ackRanges) {
//the first entry in the acknowledgement registry keeps track of the lowest possible
//offset
//that can be committed
Map.Entry<Long, Long> firstEntry = ackRanges.firstEntry();
if (firstEntry != null) {
consumer.commitAsync(Collections.singletonMap(topicPartition,
new OffsetAndMetadata(firstEntry.getValue() + 1)), null);
}
}
private Iterable<ConsumerRecord<K, V>> poll() {
ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(200));
if (!records.isEmpty()) {
if (System.currentTimeMillis() - previousKafkaMetricsUpdatedTimestamp
> metricsIntervalInSecs * 1000) {
registerConsumerMetrics();
previousKafkaMetricsUpdatedTimestamp = System.currentTimeMillis();
}
if (topologyReliabilityMode == Config.TopologyReliabilityMode.ATMOST_ONCE) {
consumer.commitAsync();
}
return records;
}
return Collections.emptyList();
}
private void registerConsumerMetrics() {
consumer.metrics().forEach((metricName, o) -> {
if (!reportedMetrics.contains(metricName)) {
reportedMetrics.add(metricName);
String exposedName = extractKafkaMetricName(metricName);
LOG.info("register Kakfa Consumer metric {}", exposedName);
topologyContext.registerMetric(exposedName, new KafkaMetricDecorator<>(o),
metricsIntervalInSecs);
}
});
}
private String extractKafkaMetricName(MetricName metricName) {
StringBuilder builder = new StringBuilder()
.append(metricName.name())
.append('-')
.append(metricName.group());
metricName.tags().forEach((s, s2) -> builder.append('-')
.append(s)
.append('-')
.append(s2));
LOG.info("register Kakfa Consumer metric {}", builder);
return builder.toString();
}
static class ConsumerRecordMessageId {
private TopicPartition topicPartition;
private long offset;
ConsumerRecordMessageId(TopicPartition topicPartition, long offset) {
this.topicPartition = topicPartition;
this.offset = offset;
}
@Override
public String toString() {
return "ConsumerRecordMessageId{"
+ "topicPartition=" + topicPartition
+ ", offset=" + offset
+ '}';
}
TopicPartition getTopicPartition() {
return topicPartition;
}
long getOffset() {
return offset;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ConsumerRecordMessageId that = (ConsumerRecordMessageId) o;
if (offset != that.offset) {
return false;
}
return topicPartition.equals(that.topicPartition);
}
@Override
public int hashCode() {
int result = topicPartition.hashCode();
result = 31 * result + (int) (offset ^ (offset >>> 32));
return result;
}
}
public class KafkaConsumerRebalanceListener implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
LOG.info("revoked partitions {}", collection);
if (topologyReliabilityMode == Config.TopologyReliabilityMode.ATLEAST_ONCE) {
collection.forEach(topicPartition -> {
ackRegistry.remove(topicPartition);
failureRegistry.remove(topicPartition);
});
} else if (topologyReliabilityMode == Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
collection.forEach(topicPartition -> state.remove(topicPartition));
}
assignedPartitions.removeAll(collection);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
LOG.info("assigned partitions {}", collection);
if (topologyReliabilityMode == Config.TopologyReliabilityMode.EFFECTIVELY_ONCE) {
collection.forEach(topicPartition -> {
if (state.containsKey(topicPartition)) {
consumer.seek(topicPartition, state.get(topicPartition));
}
});
}
assignedPartitions.addAll(collection);
}
}
}