* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.kafka.clients.simple.consumer;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.producer.PulsarClientKafkaConfig;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.common.naming.TopicName;
import kafka.api.FetchRequest;
import kafka.api.PartitionFetchInfo;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.OffsetMetadataAndError;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetCommitResponse;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import lombok.extern.slf4j.Slf4j;
* Note: <br/>
* - SimpleConsumer doesn't work well with pulsar-batch messages because client-app uses raw-offset to commit offset for
* a given group-id. <br/>
* - In order to work with partitioned-topic and batch messages: use custom api
* {@link #PulsarMsgAndOffset::getFullOffset()} to fetch offset with {@link MessageId} and commit offset with the same
* message-Id {@link PulsarOffsetMetadataAndError::PulsarOffsetMetadataAndError(messageId..)}.
public class PulsarKafkaSimpleConsumer extends SimpleConsumer {
private final String host;
private final int port;
private final String clientId;
private final PulsarClient client;
private final PulsarAdmin admin;
private final Map<TopicGroup, Consumer<byte[]>> topicConsumerMap;
private final SubscriptionType subscriptionType;
public static final String SUBSCRIPTION_TYPE = "pulsar.subscription.type";
public static final String HTTP_SERVICE_URL = "pulsar.http.service.url";
public PulsarKafkaSimpleConsumer(String host, int port, int soTimeout, int bufferSize, String clientId) {
this(host, port, soTimeout, bufferSize, clientId, new Properties());
* @param host
* pulsar-broker service url
* @param port
* n/a
* @param soTimeout
* n/a
* @param bufferSize
* n/a
* @param clientId
* client-id
* @param properties
* properties to retrieve authentication params by {@link PulsarClientKafkaConfig}
public PulsarKafkaSimpleConsumer(String host, int port, int soTimeout, int bufferSize, String clientId,
Properties properties) {
super(host, port, soTimeout, bufferSize, clientId); = host;
this.port = port;
this.clientId = clientId;
try {
client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(host).build();
} catch (PulsarClientException e) {
log.warn("Failed to create pulsar client for {} and properties {}", host, properties);
throw new RuntimeException("Failed to create pulsar client " + host, e);
try {
String url = properties.getProperty(HTTP_SERVICE_URL, host);
admin = PulsarClientKafkaConfig.getAdminBuilder(url, properties).build();
} catch (PulsarClientException e) {
log.warn("Failed to create pulsar admin for {} and properties {}", host, properties);
throw new RuntimeException("Failed to create pulsar admin " + host, e);
this.topicConsumerMap = new ConcurrentHashMap<>(8, 0.75f, 1);
this.subscriptionType = getSubscriptionType(properties);
public PulsarFetchResponse fetch(FetchRequest request) {
try {
Map<String, Reader<byte[]>> topicReaderMap = createTopicReaders(request);
return new PulsarFetchResponse(topicReaderMap, false);
} catch (Exception e) {
log.warn("Failed to process fetch request{}, {}", request, e.getMessage());
return new PulsarFetchResponse(null, true);
private Map<String, Reader<byte[]>> createTopicReaders(FetchRequest request) {
Map<String, Reader<byte[]>> topicReaderMap = Maps.newHashMap();
scala.collection.immutable.Map<String, scala.collection.immutable.Map<TopicAndPartition, PartitionFetchInfo>> reqInfo = request
Map<String, scala.collection.immutable.Map<TopicAndPartition, PartitionFetchInfo>> topicPartitionMap = scala.collection.JavaConverters
for (Entry<String, scala.collection.immutable.Map<TopicAndPartition, PartitionFetchInfo>> topicPartition : topicPartitionMap
.entrySet()) {
final String topicName = topicPartition.getKey();
Map<TopicAndPartition, PartitionFetchInfo> topicOffsetMap = scala.collection.JavaConverters
if (topicOffsetMap != null && !topicOffsetMap.isEmpty()) {
// pulsar-kafka adapter doesn't deal with partition so, assuming only 1 topic-metadata per topic name
Entry<TopicAndPartition, PartitionFetchInfo> topicOffset = topicOffsetMap.entrySet().iterator().next();
long offset = topicOffset.getValue().offset();
String topic = getTopicName(topicOffset.getKey());
MessageId msgId = getMessageId(offset);
try {
Reader<byte[]> reader = client.newReader().readerName(clientId).topic(topic).startMessageId(msgId)
.create();"Successfully created reader for {} at msg-id {}", topic, msgId);
topicReaderMap.put(topicName, reader);
} catch (PulsarClientException e) {
log.warn("Failed to create reader for topic {}", topic, e);
throw new RuntimeException("Failed to create reader for " + topic, e);
return topicReaderMap;
private MessageId getMessageId(long offset) {
if (kafka.api.OffsetRequest.EarliestTime() == offset) {
return MessageId.earliest;
} else if (kafka.api.OffsetRequest.LatestTime() == offset) {
return MessageId.latest;
} else {
return MessageIdUtils.getMessageId(offset);
public PulsarTopicMetadataResponse send(TopicMetadataRequest request) {
List<String> topics = request.topics();
PulsarTopicMetadataResponse response = new PulsarTopicMetadataResponse(admin, host, port, topics);
return response;
// It's @Overriden method of: OffsetResponse getOffsetsBefore(OffsetRequest or)
public PulsarOffsetResponse getOffsetsBefore(PulsarOffsetRequest or) {
Map<TopicAndPartition, PartitionOffsetRequestInfo> request = or.getRequestInfo();
Map<TopicAndPartition, Long> offsetResoponse = Maps.newHashMap();
for (Entry<TopicAndPartition, PartitionOffsetRequestInfo> topicPartitionRequest : request.entrySet()) {
TopicAndPartition topic = topicPartitionRequest.getKey();
long time = topicPartitionRequest.getValue().time();
if (time != kafka.api.OffsetRequest.EarliestTime() && time != kafka.api.OffsetRequest.LatestTime()) {
throw new IllegalArgumentException("Time has to be from EarliestTime or LatestTime");
offsetResoponse.put(topic, time);
return new PulsarOffsetResponse(offsetResoponse);
* <pre>
* Overriden method: OffsetCommitResponse commitOffsets(OffsetCommitRequest request)
* Note:
* created PulsarOffsetCommitResponse as OffsetCommitRequest doesn't provide getters
* </pre>
public OffsetCommitResponse commitOffsets(PulsarOffsetCommitRequest request) {
PulsarOffsetCommitResponse response = new PulsarOffsetCommitResponse(null);
for (Entry<String, MessageId> topicOffset : request.getTopicOffsetMap().entrySet()) {
final String topic = topicOffset.getKey();
final String groupId = request.getGroupId();
try {
Consumer<byte[]> consumer = getConsumer(topic, groupId);
} catch (Exception e) {
log.warn("Failed to ack message for topic {}-{}", topic, topicOffset.getValue(), e);
response.hasError = true;
TopicAndPartition topicPartition = new TopicAndPartition(topic, 0);
response.errors.computeIfAbsent(topicPartition, tp -> ErrorMapping.UnknownCode());
return response;
* <pre>
* Overriden method: OffsetFetchResponse fetchOffsets(OffsetFetchRequest request)
* Note:
* created PulsarOffsetFetchRequest as OffsetFetchRequest doesn't have getters for any field
* and PulsarOffsetFetchResponse created as base-class doesn't have setters to set state
* &#64;param request
* &#64;return
* </pre>
public PulsarOffsetFetchResponse fetchOffsets(PulsarOffsetFetchRequest request) {
final String groupId = request.groupId;
Map<TopicAndPartition, OffsetMetadataAndError> responseMap = Maps.newHashMap();
PulsarOffsetFetchResponse response = new PulsarOffsetFetchResponse(responseMap);
for (TopicAndPartition topicMetadata : request.requestInfo) {
final String topicName = getTopicName(topicMetadata);
try {
PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName, false);
CursorStats cursor = stats.cursors != null ? stats.cursors.get(groupId) : null;
if (cursor != null) {
String readPosition = cursor.readPosition;
MessageId msgId = null;
if (readPosition != null && readPosition.contains(":")) {
try {
String[] position = readPosition.split(":");
msgId = new MessageIdImpl(Long.parseLong(position[0]), Long.parseLong(position[1]), -1);
} catch (Exception e) {
log.warn("Invalid read-position {} for {}-{}", readPosition, topicName, groupId);
msgId = msgId == null ? MessageId.earliest : msgId;
OffsetMetadataAndError oE = new OffsetMetadataAndError(MessageIdUtils.getOffset(msgId), null,
responseMap.put(topicMetadata, oE);
} catch (Exception e) {
OffsetMetadataAndError oE = new OffsetMetadataAndError(0, null, ErrorMapping.UnknownCode());
responseMap.put(topicMetadata, oE);
return response;
public static String getTopicName(TopicAndPartition topicMetadata) {
return topicMetadata.partition() > -1
? TopicName.get(topicMetadata.topic()).getPartition(topicMetadata.partition()).toString()
: topicMetadata.topic();
public void close() {
if (topicConsumerMap != null) {
topicConsumerMap.forEach((topic, consumer) -> {
try {
} catch (PulsarClientException e) {
log.warn("Failed to close consumer for topic {}", topic, e);
if (client != null) {
try {
} catch (PulsarClientException e) {
log.warn("Failed to close pulsar-client ", e);
if (admin != null) {
try {
} catch (Exception e) {
log.warn("Failed to close pulsar-admin ", e);
private Consumer<byte[]> getConsumer(String topic, String groupId) {
TopicGroup topicGroup = new TopicGroup(topic, groupId);
return topicConsumerMap.computeIfAbsent(topicGroup, (topicName) -> {
try {
return client.newConsumer().topic(topic).subscriptionName(groupId).subscriptionType(subscriptionType)
} catch (PulsarClientException e) {
log.error("Failed to create consumer for topic {}", topic, e);
throw new RuntimeException("Failed to create consumer for topic " + topic, e);
public static class TopicGroup {
protected String topic;
protected String grouoId;
public TopicGroup(String topic, String grouoId) {
this.topic = topic;
this.grouoId = grouoId;
public int hashCode() {
return Objects.hash(topic, grouoId);
public boolean equals(Object obj) {
if (obj instanceof TopicGroup) {
TopicGroup t = (TopicGroup) obj;
return Objects.equals(topic, t.topic) && Objects.equals(grouoId, t.grouoId);
return false;
private SubscriptionType getSubscriptionType(Properties properties) {
String subType = properties != null && properties.contains(SUBSCRIPTION_TYPE)
? properties.getProperty(SUBSCRIPTION_TYPE)
: SubscriptionType.Failover.toString();
try {
return SubscriptionType.valueOf(subType);
} catch (IllegalArgumentException ie) {
return SubscriptionType.Failover;