blob: b67df0a2301e0c7e3588f14c6bdf1ea5695df500 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.samza.system.kafka;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import kafka.common.KafkaException;
import kafka.common.TopicAndPartition;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Separate thread that reads messages from kafka and puts them into the BlockingEnvelopeMap.
* This class is not thread safe. There will be only one instance of this class per KafkaSystemConsumer object.
* We still need some synchronization around kafkaConsumer. See pollConsumer() method for details.
*/
/*package private */class KafkaConsumerProxy<K, V> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProxy.class);
private static final int SLEEP_MS_WHILE_NO_TOPIC_PARTITION = 100;
/* package private */ final Thread consumerPollThread;
private final Consumer<K, V> kafkaConsumer;
private final KafkaSystemConsumer.KafkaConsumerMessageSink sink;
private final KafkaSystemConsumerMetrics kafkaConsumerMetrics;
private final String metricName;
private final String systemName;
private final String clientId;
private final Map<TopicPartition, SystemStreamPartition> topicPartitions2SSP = new HashMap<>();
private final Map<SystemStreamPartition, MetricName> perPartitionMetrics = new HashMap<>();
// list of all the SSPs we poll from, with their next offsets correspondingly.
private final Map<SystemStreamPartition, Long> nextOffsets = new ConcurrentHashMap<>();
// lags behind the high water mark, as reported by the Kafka consumer.
private final Map<SystemStreamPartition, Long> latestLags = new HashMap<>();
private volatile boolean isRunning = false;
private volatile Throwable failureCause = null;
private final CountDownLatch consumerPollThreadStartLatch = new CountDownLatch(1);
/* package private */KafkaConsumerProxy(Consumer<K, V> kafkaConsumer, String systemName, String clientId,
KafkaSystemConsumer.KafkaConsumerMessageSink messageSink, KafkaSystemConsumerMetrics samzaConsumerMetrics,
String metricName) {
this.kafkaConsumer = kafkaConsumer;
this.systemName = systemName;
this.sink = messageSink;
this.kafkaConsumerMetrics = samzaConsumerMetrics;
this.metricName = metricName;
this.clientId = clientId;
this.kafkaConsumerMetrics.registerClientProxy(metricName);
consumerPollThread = new Thread(createProxyThreadRunnable());
consumerPollThread.setDaemon(true);
consumerPollThread.setName(
"Samza KafkaConsumerProxy Poll " + consumerPollThread.getName() + " - " + systemName);
}
public void start() {
if (!consumerPollThread.isAlive()) {
LOG.info("Starting KafkaConsumerProxy polling thread for system " + systemName + " " + this.toString());
consumerPollThread.start();
// we need to wait until the thread starts
while (!isRunning && failureCause == null) {
try {
consumerPollThreadStartLatch.await(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.info("Got InterruptedException", e);
}
}
} else {
LOG.debug("Tried to start an already started KafkaConsumerProxy (%s). Ignoring.", this.toString());
}
}
// add new partition to the list of polled partitions
// this method is called only at the beginning, before the thread is started
public void addTopicPartition(SystemStreamPartition ssp, long nextOffset) {
LOG.info(String.format("Adding new topic and partition %s, offset = %s to queue for consumer %s", ssp, nextOffset,
this));
topicPartitions2SSP.put(KafkaSystemConsumer.toTopicPartition(ssp), ssp); //registered SSPs
// this is already vetted offset so there is no need to validate it
LOG.info(String.format("Got offset %s for new topic and partition %s.", nextOffset, ssp));
nextOffsets.put(ssp, nextOffset);
// we reuse existing metrics. They assume host and port for the broker
// for now fake the port with the consumer name
kafkaConsumerMetrics.setTopicPartitionValue(metricName, nextOffsets.size());
}
/**
* creates a separate thread for pulling messages
*/
private Runnable createProxyThreadRunnable() {
Runnable runnable= () -> {
isRunning = true;
try {
consumerPollThreadStartLatch.countDown();
LOG.info("Starting runnable " + consumerPollThread.getName());
initializeLags();
while (isRunning) {
fetchMessages();
}
} catch (Throwable throwable) {
LOG.error(String.format("Error in KafkaConsumerProxy poll thread for system: %s.", systemName), throwable);
// SamzaKafkaSystemConsumer uses the failureCause to propagate the throwable to the container
failureCause = throwable;
isRunning = false;
}
if (!isRunning) {
LOG.info("Stopping the KafkaConsumerProxy poll thread for system: {}.", systemName);
}
};
return runnable;
}
private void initializeLags() {
// This is expensive, so only do it once at the beginning. After the first poll, we can rely on metrics for lag.
Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions2SSP.keySet());
endOffsets.forEach((tp, offset) -> {
SystemStreamPartition ssp = topicPartitions2SSP.get(tp);
long startingOffset = nextOffsets.get(ssp);
// End offsets are the offset of the newest message + 1
// If the message we are about to consume is < end offset, we are starting with a lag.
long initialLag = endOffsets.get(tp) - startingOffset;
LOG.info("Initial lag for SSP {} is {} (end={}, startOffset={})", ssp, initialLag, endOffsets.get(tp), startingOffset);
latestLags.put(ssp, initialLag);
sink.setIsAtHighWatermark(ssp, initialLag == 0);
});
// initialize lag metrics
refreshLatencyMetrics();
}
// the actual polling of the messages from kafka
public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> pollConsumer(
Set<SystemStreamPartition> systemStreamPartitions, long timeout) {
if (topicPartitions2SSP.size() == 0) {
throw new SamzaException("cannot poll empty set of TopicPartitions");
}
// Since we need to poll only from some subset of TopicPartitions (passed as the argument),
// we need to pause the rest.
List<TopicPartition> topicPartitionsToPause = new ArrayList<>();
List<TopicPartition> topicPartitionsToPoll = new ArrayList<>();
for (Map.Entry<TopicPartition, SystemStreamPartition> e : topicPartitions2SSP.entrySet()) {
TopicPartition tp = e.getKey();
SystemStreamPartition ssp = e.getValue();
if (systemStreamPartitions.contains(ssp)) {
topicPartitionsToPoll.add(tp); // consume
} else {
topicPartitionsToPause.add(tp); // ignore
}
}
ConsumerRecords<K, V> records;
// make a call on the client
try {
// Currently, when doing checkpoint we are making a safeOffset request through this client, thus we need to synchronize
// them. In the future we may use this client for the actually checkpointing.
synchronized (kafkaConsumer) {
// Since we are not polling from ALL the subscribed topics, so we need to "change" the subscription temporarily
kafkaConsumer.pause(topicPartitionsToPause);
kafkaConsumer.resume(topicPartitionsToPoll);
records = kafkaConsumer.poll(timeout);
// resume original set of subscription - may be required for checkpointing
kafkaConsumer.resume(topicPartitionsToPause);
}
} catch (InvalidOffsetException e) {
// If the consumer has thrown this exception it means that auto reset is not set for this consumer.
// So we just rethrow.
LOG.error("Caught InvalidOffsetException in pollConsumer", e);
throw e;
} catch (KafkaException e) {
// we may get InvalidOffsetException | AuthorizationException | KafkaException exceptions,
// but we still just rethrow, and log it up the stack.
LOG.error("Caught a Kafka exception in pollConsumer", e);
throw e;
}
return processResults(records);
}
private Map<SystemStreamPartition, List<IncomingMessageEnvelope>> processResults(ConsumerRecords<K, V> records) {
if (records == null) {
throw new SamzaException("processResults is called with null object for records");
}
int capacity = (int) (records.count() / 0.75 + 1); // to avoid rehash, allocate more then 75% of expected capacity.
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> results = new HashMap<>(capacity);
// Parse the returned records and convert them into the IncomingMessageEnvelope.
// Note. They have been already de-serialized by the consumer.
for (ConsumerRecord<K, V> record : records) {
int partition = record.partition();
String topic = record.topic();
TopicPartition tp = new TopicPartition(topic, partition);
updateMetrics(record, tp);
SystemStreamPartition ssp = topicPartitions2SSP.get(tp);
List<IncomingMessageEnvelope> listMsgs = results.get(ssp);
if (listMsgs == null) {
listMsgs = new ArrayList<>();
results.put(ssp, listMsgs);
}
final K key = record.key();
final Object value = record.value();
final IncomingMessageEnvelope imEnvelope =
new IncomingMessageEnvelope(ssp, String.valueOf(record.offset()), key, value, getRecordSize(record));
listMsgs.add(imEnvelope);
}
if (LOG.isDebugEnabled()) {
LOG.debug("# records per SSP:");
for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : results.entrySet()) {
List<IncomingMessageEnvelope> list = e.getValue();
LOG.debug(e.getKey() + " = " + ((list == null) ? 0 : list.size()));
}
}
return results;
}
private int getRecordSize(ConsumerRecord<K, V> r) {
int keySize = (r.key() == null) ? 0 : r.serializedKeySize();
return keySize + r.serializedValueSize();
}
private void updateMetrics(ConsumerRecord<K, V> r, TopicPartition tp) {
TopicAndPartition tap = KafkaSystemConsumer.toTopicAndPartition(tp);
SystemStreamPartition ssp = new SystemStreamPartition(systemName, tp.topic(), new Partition(tp.partition()));
long currentSSPLag = getLatestLag(ssp); // lag between the current offset and the highwatermark
if (currentSSPLag < 0) {
return;
}
long recordOffset = r.offset();
long highWatermark = recordOffset + currentSSPLag; // derived value for the highwatermark
int size = getRecordSize(r);
kafkaConsumerMetrics.incReads(tap);
kafkaConsumerMetrics.incBytesReads(tap, size);
kafkaConsumerMetrics.setOffsets(tap, recordOffset);
kafkaConsumerMetrics.incClientBytesReads(metricName, size);
kafkaConsumerMetrics.setHighWatermarkValue(tap, highWatermark);
}
/*
This method put messages into blockingEnvelopeMap.
*/
private void moveMessagesToTheirQueue(SystemStreamPartition ssp, List<IncomingMessageEnvelope> envelopes) {
long nextOffset = nextOffsets.get(ssp);
for (IncomingMessageEnvelope env : envelopes) {
sink.addMessage(ssp, env); // move message to the BlockingEnvelopeMap's queue
LOG.trace("IncomingMessageEnvelope. got envelope with offset:{} for ssp={}", env.getOffset(), ssp);
nextOffset = Long.valueOf(env.getOffset()) + 1;
}
nextOffsets.put(ssp, nextOffset);
}
private void populateMetricNames(Set<SystemStreamPartition> ssps) {
HashMap<String, String> tags = new HashMap<>();
tags.put("client-id", clientId);// this is required by the KafkaConsumer to get the metrics
for (SystemStreamPartition ssp : ssps) {
TopicPartition tp = KafkaSystemConsumer.toTopicPartition(ssp);
perPartitionMetrics.put(ssp, new MetricName(tp + ".records-lag", "consumer-fetch-manager-metrics", "", tags));
}
}
/*
The only way to figure out lag for the KafkaConsumer is to look at the metrics after each poll() call.
One of the metrics (records-lag) shows how far behind the HighWatermark the consumer is.
This method populates the lag information for each SSP into latestLags member variable.
*/
private void populateCurrentLags(Set<SystemStreamPartition> ssps) {
Map<MetricName, ? extends Metric> consumerMetrics = kafkaConsumer.metrics();
// populate the MetricNames first time
if (perPartitionMetrics.isEmpty()) {
populateMetricNames(ssps);
}
for (SystemStreamPartition ssp : ssps) {
MetricName mn = perPartitionMetrics.get(ssp);
Metric currentLagM = consumerMetrics.get(mn);
// High watermark is fixed to be the offset of last available message,
// so the lag is now at least 0, which is the same as Samza's definition.
// If the lag is not 0, then isAtHead is not true, and kafkaClient keeps polling.
long currentLag = (currentLagM != null) ? (long) currentLagM.value() : -1L;
/*
Metric averageLagM = consumerMetrics.get(new MetricName(tp + ".records-lag-avg", "consumer-fetch-manager-metrics", "", tags));
double averageLag = (averageLagM != null) ? averageLagM.value() : -1.0;
Metric maxLagM = consumerMetrics.get(new MetricName(tp + ".records-lag-max", "consumer-fetch-manager-metrics", "", tags));
double maxLag = (maxLagM != null) ? maxLagM.value() : -1.0;
*/
latestLags.put(ssp, currentLag);
// calls the setIsAtHead for the BlockingEnvelopeMap
sink.setIsAtHighWatermark(ssp, currentLag == 0);
}
}
/*
Get the latest lag for a specific SSP.
*/
public long getLatestLag(SystemStreamPartition ssp) {
Long lag = latestLags.get(ssp);
if (lag == null) {
throw new SamzaException("Unknown/unregistered ssp in latestLags request: " + ssp);
}
return lag;
}
/*
Using the consumer to poll the messages from the stream.
*/
private void fetchMessages() {
Set<SystemStreamPartition> sspsToFetch = new HashSet<>();
for (SystemStreamPartition ssp : nextOffsets.keySet()) {
if (sink.needsMoreMessages(ssp)) {
sspsToFetch.add(ssp);
}
}
LOG.debug("pollConsumer {}", sspsToFetch.size());
if (!sspsToFetch.isEmpty()) {
kafkaConsumerMetrics.incClientReads(metricName);
Map<SystemStreamPartition, List<IncomingMessageEnvelope>> response;
LOG.debug("pollConsumer from following SSPs: {}; total#={}", sspsToFetch, sspsToFetch.size());
response = pollConsumer(sspsToFetch, 500); // TODO should be default value from ConsumerConfig
// move the responses into the queue
for (Map.Entry<SystemStreamPartition, List<IncomingMessageEnvelope>> e : response.entrySet()) {
List<IncomingMessageEnvelope> envelopes = e.getValue();
if (envelopes != null) {
moveMessagesToTheirQueue(e.getKey(), envelopes);
}
}
populateCurrentLags(sspsToFetch); // find current lags for for each SSP
} else { // nothing to read
LOG.debug("No topic/partitions need to be fetched for consumer {} right now. Sleeping {}ms.", kafkaConsumer,
SLEEP_MS_WHILE_NO_TOPIC_PARTITION);
kafkaConsumerMetrics.incClientSkippedFetchRequests(metricName);
try {
Thread.sleep(SLEEP_MS_WHILE_NO_TOPIC_PARTITION);
} catch (InterruptedException e) {
LOG.warn("Sleep in fetchMessages was interrupted");
}
}
refreshLatencyMetrics();
}
private void refreshLatencyMetrics() {
for (Map.Entry<SystemStreamPartition, Long> e : nextOffsets.entrySet()) {
SystemStreamPartition ssp = e.getKey();
Long offset = e.getValue();
TopicAndPartition tp = new TopicAndPartition(ssp.getStream(), ssp.getPartition().getPartitionId());
Long lag = latestLags.get(ssp);
LOG.trace("Latest offset of {} is {}; lag = {}", ssp, offset, lag);
if (lag != null && offset != null && lag >= 0) {
long streamEndOffset = offset.longValue() + lag.longValue();
// update the metrics
kafkaConsumerMetrics.setHighWatermarkValue(tp, streamEndOffset);
kafkaConsumerMetrics.setLagValue(tp, lag.longValue());
}
}
}
boolean isRunning() {
return isRunning;
}
Throwable getFailureCause() {
return failureCause;
}
/**
* stop the thread and wait for it to stop
* @param timeoutMs how long to wait in join
*/
public void stop(long timeoutMs) {
LOG.info("Shutting down KafkaConsumerProxy poll thread:" + consumerPollThread.getName());
isRunning = false;
try {
consumerPollThread.join(timeoutMs);
// join returns event if the thread didn't finish
// in this case we should interrupt it and wait again
if (consumerPollThread.isAlive()) {
consumerPollThread.interrupt();
consumerPollThread.join(timeoutMs);
}
} catch (InterruptedException e) {
LOG.warn("Join in KafkaConsumerProxy has failed", e);
consumerPollThread.interrupt();
}
}
}