blob: f69350e79ad5ebeb84b72c0e062176f87560b4ae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.twill.internal.kafka.client;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.apache.twill.common.Cancellable;
import org.apache.twill.common.Threads;
import org.apache.twill.kafka.client.BrokerInfo;
import org.apache.twill.kafka.client.BrokerService;
import org.apache.twill.kafka.client.FetchedMessage;
import org.apache.twill.kafka.client.KafkaConsumer;
import org.apache.twill.kafka.client.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.ConnectException;
import java.nio.channels.ClosedByInterruptException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* A {@link KafkaConsumer} implementation using the scala kafka api.
*/
final class SimpleKafkaConsumer implements KafkaConsumer {
private static final Logger LOG = LoggerFactory.getLogger(SimpleKafkaConsumer.class);
private static final int FETCH_SIZE = 1024 * 1024; // Use a default fetch size.
private static final int SO_TIMEOUT = 5 * 1000; // 5 seconds.
private static final int MAX_WAIT = 1000; // 1 second.
private static final long CONSUMER_EXPIRE_MINUTES = 1L; // close consumer if not used for 1 minute.
private static final long INIT_CONSUMER_FAILURE_BACKOFF = 100L; // Initial backoff for 100ms if failure in consumer.
private static final long MAX_CONSUMER_FAILURE_BACKOFF = 10000L; // Backoff max for 10 seconds if failure in consumer.
private static final long EMPTY_FETCH_WAIT = 500L; // Sleep for 500 ms if no message is fetched.
private final BrokerService brokerService;
private final LoadingCache<BrokerInfo, SimpleConsumer> consumers;
private final BlockingQueue<Cancellable> consumerCancels;
SimpleKafkaConsumer(BrokerService brokerService) {
this.brokerService = brokerService;
this.consumers = CacheBuilder.newBuilder()
.expireAfterAccess(CONSUMER_EXPIRE_MINUTES, TimeUnit.MINUTES)
.removalListener(createRemovalListener())
.build(createConsumerLoader());
this.consumerCancels = new LinkedBlockingQueue<Cancellable>();
}
@Override
public Preparer prepare() {
return new SimplePreparer();
}
/**
* Called to stop all consumers created. This method should only be
* called by KafkaClientService who own this consumer.
*/
void stop() {
LOG.info("Stopping Kafka consumer");
List<Cancellable> cancels = Lists.newLinkedList();
consumerCancels.drainTo(cancels);
for (Cancellable cancel : cancels) {
cancel.cancel();
}
consumers.invalidateAll();
LOG.info("Kafka Consumer stopped");
}
/**
* Creates a CacheLoader for creating SimpleConsumer.
*/
private CacheLoader<BrokerInfo, SimpleConsumer> createConsumerLoader() {
return new CacheLoader<BrokerInfo, SimpleConsumer>() {
@Override
public SimpleConsumer load(BrokerInfo key) throws Exception {
return new SimpleConsumer(key.getHost(), key.getPort(), SO_TIMEOUT, FETCH_SIZE, "simple-kafka-client");
}
};
}
/**
* Creates a RemovalListener that will close SimpleConsumer on cache removal.
*/
private RemovalListener<BrokerInfo, SimpleConsumer> createRemovalListener() {
return new RemovalListener<BrokerInfo, SimpleConsumer>() {
@Override
public void onRemoval(RemovalNotification<BrokerInfo, SimpleConsumer> notification) {
SimpleConsumer consumer = notification.getValue();
if (consumer != null) {
consumer.close();
}
}
};
}
/**
* Retrieves the last offset before the given timestamp for a given topic partition.
*
* @return The last offset before the given timestamp or {@code 0} if failed to do so.
*/
private long getLastOffset(TopicPartition topicPart, long timestamp) {
BrokerInfo brokerInfo = brokerService.getLeader(topicPart.getTopic(), topicPart.getPartition());
SimpleConsumer consumer = brokerInfo == null ? null : consumers.getUnchecked(brokerInfo);
// If no broker, treat it as failure attempt.
if (consumer == null) {
LOG.warn("Failed to talk to any broker. Default offset to 0 for {}", topicPart);
return 0L;
}
// Fire offset request
OffsetRequest request = new OffsetRequest(ImmutableMap.of(
new TopicAndPartition(topicPart.getTopic(), topicPart.getPartition()),
new PartitionOffsetRequestInfo(timestamp, 1)
), kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
OffsetResponse response = consumer.getOffsetsBefore(request);
// Retrieve offsets from response
long[] offsets = response.hasError() ? null : response.offsets(topicPart.getTopic(), topicPart.getPartition());
if (offsets == null || offsets.length <= 0) {
short errorCode = response.errorCode(topicPart.getTopic(), topicPart.getPartition());
// If the topic partition doesn't exists, use offset 0 without logging error.
if (errorCode != ErrorMapping.UnknownTopicOrPartitionCode()) {
consumers.refresh(brokerInfo);
LOG.warn("Failed to fetch offset for {} with timestamp {}. Error: {}. Default offset to 0.",
topicPart, timestamp, errorCode);
}
return 0L;
}
LOG.debug("Offset {} fetched for {} with timestamp {}.", offsets[0], topicPart, timestamp);
return offsets[0];
}
/**
* A preparer that uses kafak scala api for consuming messages.
*/
private final class SimplePreparer implements Preparer {
// Map from TopicPartition to offset
private final Map<TopicPartition, Long> requests;
private final ThreadFactory threadFactory;
private SimplePreparer() {
this.requests = Maps.newHashMap();
this.threadFactory = Threads.createDaemonThreadFactory("message-callback-%d");
}
@Override
public Preparer add(String topic, int partition, long offset) {
requests.put(new TopicPartition(topic, partition), offset);
return this;
}
@Override
public Preparer addFromBeginning(String topic, int partition) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
requests.put(topicPartition, kafka.api.OffsetRequest.EarliestTime());
return this;
}
@Override
public Preparer addLatest(String topic, int partition) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
requests.put(topicPartition, kafka.api.OffsetRequest.LatestTime());
return this;
}
@Override
public Cancellable consume(MessageCallback callback) {
final ExecutorService executor = Executors.newSingleThreadExecutor(threadFactory);
final List<ConsumerThread> pollers = Lists.newArrayList();
// When cancelling the consumption, first terminates all polling threads and then stop the executor service.
final AtomicBoolean cancelled = new AtomicBoolean();
Cancellable cancellable = new Cancellable() {
@Override
public void cancel() {
if (!cancelled.compareAndSet(false, true)) {
return;
}
consumerCancels.remove(this);
LOG.info("Requesting stop of all consumer threads.");
for (ConsumerThread consumerThread : pollers) {
consumerThread.terminate();
}
LOG.info("Wait for all consumer threads to stop.");
for (ConsumerThread consumerThread : pollers) {
try {
consumerThread.join();
} catch (InterruptedException e) {
LOG.warn("Interrupted exception while waiting for thread to complete.", e);
}
}
LOG.info("All consumer threads stopped.");
// Use shutdown so that submitted task still has chance to execute, which is important for finished to get
// called.
executor.shutdown();
}
};
// Wrap the callback with a single thread executor.
MessageCallback messageCallback = wrapCallback(callback, executor, cancellable);
// Starts threads for polling new messages.
for (Map.Entry<TopicPartition, Long> entry : requests.entrySet()) {
ConsumerThread consumerThread = new ConsumerThread(entry.getKey(), entry.getValue(), messageCallback);
consumerThread.setDaemon(true);
consumerThread.start();
pollers.add(consumerThread);
}
consumerCancels.add(cancellable);
return cancellable;
}
/**
* Wrap a given MessageCallback by a executor so that calls are executed in the given executor.
* By running the calls through the executor, it also block and wait for the task being completed so that
* it can block the poller thread depending on the rate of processing that the callback can handle.
*/
private MessageCallback wrapCallback(final MessageCallback callback,
final ExecutorService executor, final Cancellable cancellable) {
final AtomicBoolean stopped = new AtomicBoolean();
return new MessageCallback() {
@Override
public long onReceived(final Iterator<FetchedMessage> messages) {
if (stopped.get()) {
return -1L;
}
return Futures.getUnchecked(executor.submit(new Callable<Long>() {
@Override
public Long call() {
if (stopped.get()) {
return -1L;
}
return callback.onReceived(messages);
}
}));
}
@Override
public void finished() {
// Make sure finished only get called once.
if (!stopped.compareAndSet(false, true)) {
return;
}
Futures.getUnchecked(executor.submit(new Runnable() {
@Override
public void run() {
// When finished is called, also cancel the consumption from all polling thread.
callback.finished();
cancellable.cancel();
}
}));
}
};
}
}
/**
* The thread for polling kafka.
*/
private final class ConsumerThread extends Thread {
private final TopicPartition topicPart;
private final long startOffset;
private final MessageCallback callback;
private final BasicFetchedMessage fetchedMessage;
private volatile boolean running;
private ConsumerThread(TopicPartition topicPart, long startOffset, MessageCallback callback) {
super(String.format("Kafka-Consumer-%s-%d", topicPart.getTopic(), topicPart.getPartition()));
this.topicPart = topicPart;
this.startOffset = startOffset;
this.callback = callback;
this.running = true;
this.fetchedMessage = new BasicFetchedMessage(topicPart);
}
@Override
public void run() {
final AtomicLong offset = new AtomicLong(startOffset);
Map.Entry<BrokerInfo, SimpleConsumer> consumerEntry = null;
ExponentialBackoff backoff = new ExponentialBackoff(INIT_CONSUMER_FAILURE_BACKOFF,
MAX_CONSUMER_FAILURE_BACKOFF, TimeUnit.MILLISECONDS);
while (running) {
if (consumerEntry == null && (consumerEntry = getConsumerEntry()) == null) {
LOG.debug("No leader for topic partition {}.", topicPart);
backoff.backoff();
continue;
}
try {
// If offset < 0, meaning it's special offset value that needs to fetch either the earliest or latest offset
// from kafak server.
long off = offset.get();
if (off < 0) {
offset.set(getLastOffset(topicPart, off));
}
SimpleConsumer consumer = consumerEntry.getValue();
// Fire a fetch message request
FetchResponse response = fetchMessages(consumer, offset.get());
// Failure response, set consumer entry to null and let next round of loop to handle it.
if (response.hasError()) {
short errorCode = response.errorCode(topicPart.getTopic(), topicPart.getPartition());
LOG.info("Failed to fetch message on {}. Error: {}", topicPart, errorCode);
// If it is out of range error, reset to earliest offset
if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
offset.set(kafka.api.OffsetRequest.EarliestTime());
}
consumers.refresh(consumerEntry.getKey());
consumerEntry = null;
backoff.backoff();
continue;
}
ByteBufferMessageSet messages = response.messageSet(topicPart.getTopic(), topicPart.getPartition());
if (sleepIfEmpty(messages)) {
continue;
}
// Call the callback
invokeCallback(messages, offset);
backoff.reset();
} catch (Throwable t) {
// Only log if it is still running, otherwise, it just the interrupt caused by the stop.
if (!running) {
LOG.debug("Unable to fetch messages on {}, kafka consumer service shutdown is in progress.", topicPart);
} else {
if (t instanceof ClosedByInterruptException || t instanceof ConnectException) {
LOG.debug("Unable to fetch messages on {}, kafka server shutdown is in progress.", topicPart);
} else {
LOG.info("Exception when fetching message on {}.", topicPart, t);
}
backoff.backoff();
}
consumers.refresh(consumerEntry.getKey());
consumerEntry = null;
}
}
// When the thread is done, call the callback finished method.
try {
callback.finished();
} catch (Throwable t) {
LOG.error("Exception thrown from MessageCallback.finished({})", running, t);
}
}
public void terminate() {
LOG.info("Terminate requested {}", getName());
running = false;
interrupt();
}
/**
* Gets the leader broker and the associated SimpleConsumer for the current topic and partition.
*/
private Map.Entry<BrokerInfo, SimpleConsumer> getConsumerEntry() {
BrokerInfo leader = brokerService.getLeader(topicPart.getTopic(), topicPart.getPartition());
return leader == null ? null : Maps.immutableEntry(leader, consumers.getUnchecked(leader));
}
/**
* Makes a call to kafka to fetch messages.
*/
private FetchResponse fetchMessages(SimpleConsumer consumer, long offset) {
FetchRequest request = new FetchRequestBuilder()
.clientId(consumer.clientId())
.addFetch(topicPart.getTopic(), topicPart.getPartition(), offset, FETCH_SIZE)
.maxWait(MAX_WAIT)
.build();
return consumer.fetch(request);
}
/**
* Sleeps if the message set is empty.
* @return {@code true} if it is empty, {@code false} otherwise.
*/
private boolean sleepIfEmpty(ByteBufferMessageSet messages) {
if (Iterables.isEmpty(messages)) {
LOG.trace("No message fetched. Sleep for {} ms before next fetch.", EMPTY_FETCH_WAIT);
try {
TimeUnit.MILLISECONDS.sleep(EMPTY_FETCH_WAIT);
} catch (InterruptedException e) {
// It's interrupted from stop, ok to ignore.
}
return true;
}
return false;
}
/**
* Calls the message callback with the given message set.
*/
private void invokeCallback(ByteBufferMessageSet messages, AtomicLong offset) {
long savedOffset = offset.get();
try {
offset.set(callback.onReceived(createFetchedMessages(messages, offset)));
} catch (Throwable t) {
LOG.error("Callback throws exception. Retry from offset {} for {}", startOffset, topicPart, t);
offset.set(savedOffset);
}
}
/**
* Creates an Iterator of FetchedMessage based on the given message set. The iterator would also updates
* the offset while iterating.
*/
private Iterator<FetchedMessage> createFetchedMessages(ByteBufferMessageSet messageSet, final AtomicLong offset) {
final Iterator<MessageAndOffset> messages = messageSet.iterator();
return new AbstractIterator<FetchedMessage>() {
@Override
protected FetchedMessage computeNext() {
while (messages.hasNext()) {
MessageAndOffset message = messages.next();
long msgOffset = message.offset();
if (msgOffset < offset.get()) {
LOG.trace("Received old offset {}, expecting {} on {}. Message Ignored.",
msgOffset, offset.get(), topicPart);
continue;
}
fetchedMessage.setPayload(message.message().payload());
fetchedMessage.setOffset(message.offset());
fetchedMessage.setNextOffset(message.nextOffset());
return fetchedMessage;
}
return endOfData();
}
};
}
/**
* Helper class for performance exponential backoff on message fetching failure.
*/
private final class ExponentialBackoff {
private final long initialBackoff;
private final long maxBackoff;
private final TimeUnit backoffUnit;
private int failureCount = 0;
private ExponentialBackoff(long initialBackoff, long maxBackoff, TimeUnit backoffUnit) {
this.initialBackoff = initialBackoff;
this.maxBackoff = maxBackoff;
this.backoffUnit = backoffUnit;
}
void backoff() {
failureCount++;
long multiplier = failureCount > Long.SIZE ? Long.MAX_VALUE : (1L << (failureCount - 1));
long backoff = Math.min(initialBackoff * multiplier, maxBackoff);
backoff = backoff < 0 ? maxBackoff : backoff;
try {
backoffUnit.sleep(backoff);
} catch (InterruptedException e) {
// OK to ignore since this method is called from the consumer thread only, which on thread shutdown,
// the thread will be interrupted
}
}
void reset() {
failureCount = 0;
}
}
}
}