blob: 0c0059f66f85d9dfa58195572751006daa7b71fc [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.kafka.clients.consumer;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener<byte[]> {
private static final long serialVersionUID = 1L;
private final PulsarClient client;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
private final String groupId;
private final boolean isAutoCommit;
private final ConcurrentMap<TopicPartition, org.apache.pulsar.client.api.Consumer<byte[]>> consumers = new ConcurrentHashMap<>();
private final Map<TopicPartition, Long> lastReceivedOffset = new ConcurrentHashMap<>();
private final Map<TopicPartition, OffsetAndMetadata> lastCommittedOffset = new ConcurrentHashMap<>();
private final Set<TopicPartition> unpolledPartitions = new HashSet<>();
private final SubscriptionInitialPosition strategy;
private List<ConsumerInterceptor<K, V>> interceptors;
private volatile boolean closed = false;
private final int maxRecordsInSinglePoll;
private final Properties properties;
private static class QueueItem {
final org.apache.pulsar.client.api.Consumer<byte[]> consumer;
final Message<byte[]> message;
QueueItem(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> message) {
this.consumer = consumer;
this.message = message;
// Since a single Kafka consumer can receive from multiple topics, we need to multiplex all the different
// topics/partitions into a single queues
private final BlockingQueue<QueueItem> receivedMessages = new ArrayBlockingQueue<>(1000);
public PulsarKafkaConsumer(Map<String, Object> configs) {
this(configs, null, null);
public PulsarKafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(configs, keyDeserializer, valueDeserializer)),
keyDeserializer, valueDeserializer);
public PulsarKafkaConsumer(Properties properties) {
this(properties, null, null);
public PulsarKafkaConsumer(Properties properties, Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
this(new ConsumerConfig(ConsumerConfig.addDeserializerToConfig(properties, keyDeserializer, valueDeserializer)),
keyDeserializer, valueDeserializer);
private PulsarKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
this.keyDeserializer.configure(config.originals(), true);
} else {
this.keyDeserializer = keyDeserializer;
if (valueDeserializer == null) {
this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
this.valueDeserializer.configure(config.originals(), true);
} else {
this.valueDeserializer = valueDeserializer;
groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
isAutoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
strategy = getStrategy(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));"Offset reset strategy has been assigned value {}", strategy);
String serviceUrl = config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
// If MAX_POLL_RECORDS_CONFIG is provided then use the config, else use default value.
maxRecordsInSinglePoll = config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
} else {
maxRecordsInSinglePoll = 1000;
interceptors = (List) config.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class); = new Properties();
config.originals().forEach((k, v) -> properties.put(k, v));
ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
// Since this client instance is going to be used just for the consumers, we can enable Nagle to group
// all the acknowledgments sent to broker within a short time frame
try {
client = clientBuilder.serviceUrl(serviceUrl).build();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
private SubscriptionInitialPosition getStrategy(final String strategy) {
switch(strategy) {
case "earliest":
return SubscriptionInitialPosition.Earliest;
return SubscriptionInitialPosition.Latest;
public void received(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> msg) {
// Block listener thread if the application is slowing down
try {
receivedMessages.put(new QueueItem(consumer, msg));
} catch (InterruptedException e) {
if (closed) {
// Consumer was closed and the thread was interrupted. Nothing to worry about here
} else {
throw new RuntimeException(e);
public Set<TopicPartition> assignment() {
throw new UnsupportedOperationException("Cannot access the partitions assignements");
* Get the current subscription. Will return the same topics used in the most recent call to
* {@link #subscribe(Collection, ConsumerRebalanceListener)}, or an empty set if no such call has been made.
* @return The set of topics currently subscribed to
public Set<String> subscription() {
return consumers.keySet().stream().map(TopicPartition::topic).collect(Collectors.toSet());
public void subscribe(Collection<String> topics) {
subscribe(topics, null);
public void subscribe(Collection<String> topics, ConsumerRebalanceListener callback) {
List<CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>>> futures = new ArrayList<>();
List<TopicPartition> topicPartitions = new ArrayList<>();
try {
for (String topic : topics) {
// Create individual subscription on each partition, that way we can keep using the
// acknowledgeCumulative()
int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get();
ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(client, properties);
if (numberOfPartitions > 1) {
// Subscribe to each partition
for (int i = 0; i < numberOfPartitions; i++) {
String partitionName = TopicName.get(topic).getPartition(i).toString();
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
int partitionIndex = i;
TopicPartition tp = new TopicPartition(
futures.add(future.thenApply(consumer -> {"Add consumer {} for partition {}", consumer, tp);
consumers.putIfAbsent(tp, consumer);
return consumer;
} else {
// Topic has a single partition
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
TopicPartition tp = new TopicPartition(
futures.add(future.thenApply(consumer -> {"Add consumer {} for partition {}", consumer, tp);
consumers.putIfAbsent(tp, consumer);
return consumer;
// Wait for all consumers to be ready
// Notify the listener is now owning all topics/partitions
if (callback != null) {
} catch (Exception e) {
// Close all consumer that might have been successfully created
futures.forEach(f -> {
try {
} catch (Exception e1) {
// Ignore. Consumer already had failed
throw new RuntimeException(e);
public void assign(Collection<TopicPartition> partitions) {
throw new UnsupportedOperationException("Cannot manually assign partitions");
public void subscribe(Pattern pattern, ConsumerRebalanceListener callback) {
throw new UnsupportedOperationException("Cannot subscribe with topic name pattern");
public void unsubscribe() {
consumers.values().forEach(c -> {
try {
} catch (PulsarClientException e) {
throw new RuntimeException(e);
public ConsumerRecords<K, V> poll(long timeoutMillis) {
try {
QueueItem item = receivedMessages.poll(timeoutMillis, TimeUnit.MILLISECONDS);
if (item == null) {
return (ConsumerRecords<K, V>) ConsumerRecords.EMPTY;
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
int numberOfRecords = 0;
while (item != null) {
TopicName topicName = TopicName.get(item.consumer.getTopic());
String topic = topicName.getPartitionedTopicName();
int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
Message<byte[]> msg = item.message;
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
long offset = MessageIdUtils.getOffset(msgId);
TopicPartition tp = new TopicPartition(topic, partition);
if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp)) {"When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp);
K key = getKey(topic, msg);
V value = valueDeserializer.deserialize(topic, msg.getData());
TimestampType timestampType = TimestampType.LOG_APPEND_TIME;
long timestamp = msg.getPublishTime();
if (msg.getEventTime() > 0) {
// If we have Event time, use that in preference
timestamp = msg.getEventTime();
timestampType = TimestampType.CREATE_TIME;
ConsumerRecord<K, V> consumerRecord = new ConsumerRecord<>(topic, partition, offset, timestamp,
timestampType, -1, msg.hasKey() ? msg.getKey().length() : 0, msg.getData().length, key, value);
records.computeIfAbsent(tp, k -> new ArrayList<>()).add(consumerRecord);
// Update last offset seen by application
lastReceivedOffset.put(tp, offset);
if (++numberOfRecords >= maxRecordsInSinglePoll) {
// Check if we have an item already available
item = receivedMessages.poll(0, TimeUnit.MILLISECONDS);
if (isAutoCommit && !records.isEmpty()) {
// Commit the offset of previously dequeued messages
// If no interceptor is provided, interceptors list will an empty list, original ConsumerRecords will be return.
return applyConsumerInterceptorsOnConsume(interceptors, new ConsumerRecords<>(records));
} catch (InterruptedException e) {
throw new RuntimeException(e);
private K getKey(String topic, Message<byte[]> msg) {
if (!msg.hasKey()) {
return null;
if (keyDeserializer instanceof StringDeserializer) {
return (K) msg.getKey();
} else {
// Assume base64 encoding
byte[] data = Base64.getDecoder().decode(msg.getKey());
return keyDeserializer.deserialize(topic, data);
public void commitSync() {
try {
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
try {
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
public void commitAsync() {
public void commitAsync(OffsetCommitCallback callback) {
Map<TopicPartition, OffsetAndMetadata> offsets = getCurrentOffsetsMap();
doCommitOffsets(offsets).handle((v, throwable) -> {
callback.onComplete(offsets, throwable != null ? new Exception(throwable) : null);
return null;
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
doCommitOffsets(offsets).handle((v, throwable) -> {
callback.onComplete(offsets, throwable != null ? new Exception(throwable) : null);
return null;
private CompletableFuture<Void> doCommitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
applyConsumerInterceptorsOnCommit(interceptors, offsets);
offsets.forEach((topicPartition, offsetAndMetadata) -> {
org.apache.pulsar.client.api.Consumer<byte[]> consumer = consumers.get(topicPartition);
lastCommittedOffset.put(topicPartition, offsetAndMetadata);
return FutureUtil.waitForAll(futures);
private Map<TopicPartition, OffsetAndMetadata> getCurrentOffsetsMap() {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
lastReceivedOffset.forEach((topicPartition, offset) -> {
OffsetAndMetadata om = new OffsetAndMetadata(offset);
offsets.put(topicPartition, om);
return offsets;
* Apply all onConsume methods in a list of ConsumerInterceptors.
* Catch any exception during the process.
* @param interceptors Interceptors provided.
* @param consumerRecords ConsumerRecords returned by calling {@link this#poll(long)}.
* @return ConsumerRecords after applying all ConsumerInterceptor in interceptors list.
private ConsumerRecords applyConsumerInterceptorsOnConsume(List<ConsumerInterceptor<K, V>> interceptors, ConsumerRecords consumerRecords) {
ConsumerRecords processedConsumerRecords = consumerRecords;
for (ConsumerInterceptor interceptor : interceptors) {
try {
processedConsumerRecords = interceptor.onConsume(processedConsumerRecords);
} catch (Exception e) {
log.warn("Error executing onConsume for interceptor {}.", interceptor.getClass().getCanonicalName(), e);
return processedConsumerRecords;
* Apply all onCommit methods in a list of ConsumerInterceptors.
* Catch any exception during the process.
* @param interceptors Interceptors provided.
* @param offsets Offsets need to be commit.
private void applyConsumerInterceptorsOnCommit(List<ConsumerInterceptor<K, V>> interceptors, Map<TopicPartition, OffsetAndMetadata> offsets) {
for (ConsumerInterceptor interceptor : interceptors) {
try {
} catch (Exception e) {
log.warn("Error executing onCommit for interceptor {}.", interceptor.getClass().getCanonicalName(), e);
public void seek(TopicPartition partition, long offset) {
MessageId msgId = MessageIdUtils.getMessageId(offset);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(partition);
if (c == null) {
throw new IllegalArgumentException("Cannot seek on a partition where we are not subscribed");
try {;
} catch (PulsarClientException e) {
throw new RuntimeException(e);
public void seekToBeginning(Collection<TopicPartition> partitions) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
if (partitions.isEmpty()) {
partitions = consumers.keySet();
for (TopicPartition tp : partitions) {
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
if (c == null) {
new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
} else {
public void seekToEnd(Collection<TopicPartition> partitions) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
if (partitions.isEmpty()) {
partitions = consumers.keySet();
for (TopicPartition tp : partitions) {
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
if (c == null) {
new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
} else {
public long position(TopicPartition partition) {
Long offset = lastReceivedOffset.get(partition);
if (offset == null && !unpolledPartitions.contains(partition)) {
return resetOffsets(partition).getValue();
return unpolledPartitions.contains(partition) ? 0 : offset;
private SubscriptionInitialPosition resetOffsets(final TopicPartition partition) {"Resetting partition {} and seeking to {} position", partition, strategy);
if (strategy == SubscriptionInitialPosition.Earliest) {
} else {
return strategy;
public OffsetAndMetadata committed(TopicPartition partition) {
return lastCommittedOffset.get(partition);
public Map<MetricName, ? extends Metric> metrics() {
throw new UnsupportedOperationException();
public List<PartitionInfo> partitionsFor(String topic) {
throw new UnsupportedOperationException();
public Map<String, List<PartitionInfo>> listTopics() {
throw new UnsupportedOperationException();
public Set<TopicPartition> paused() {
throw new UnsupportedOperationException();
public void pause(Collection<TopicPartition> partitions) {
throw new UnsupportedOperationException();
public void resume(Collection<TopicPartition> partitions) {
throw new UnsupportedOperationException();
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
throw new UnsupportedOperationException();
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
throw new UnsupportedOperationException();
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
throw new UnsupportedOperationException();
public void close() {
public void close(long timeout, TimeUnit unit) {
try {
closed = true;
if (isAutoCommit) {
client.closeAsync().get(timeout, unit);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
public void wakeup() {
throw new UnsupportedOperationException();