title: Pulsar adaptor for Apache Kafka tags: [apache, kafka, wrapper]

Pulsar provides an easy option for applications that are currently written using the Apache Kafka Java client API.

Using the Pulsar Kafka compatibility wrapper

In an existing application, change the regular Kafka client dependency and replace it with the Pulsar Kafka wrapper:

Remove:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kakfa-clients</artifactId>
  <version>0.10.2.1</version>
</dependency>

Include dependency for Pulsar Kafka wrapper:

<dependency>
  <groupId>org.apache.pulsar</groupId>
  <artifactId>pulsar-client-kafka-compat</artifactId>
  <version>{{ site.current_version }}</version>
</dependency>

With the new dependency, the existing code should work without any changes. The only thing that needs to be adjusted is the configuration, to make sure to point the producers and consumers to Pulsar service rather than Kafka and to use a particular Pulsar topic.

Producer example

// Topic needs to be a regular Pulsar topic
String topic = "persistent://sample/standalone/ns/my-topic";

Properties props = new Properties();
// Point to a Pulsar service
props.put("bootstrap.servers", "pulsar://localhost:6650");

props.put("key.serializer", IntegerSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());

Producer<Integer, String> producer = new KafkaProducer<>(props);

for (int i = 0; i < 10; i++) {
    producer.send(new ProducerRecord<Integer, String>(topic, i, "hello-" + i));
    log.info("Message {} sent successfully", i);
}

producer.close();

Consumer example

String topic = "persistent://sample/standalone/ns/my-topic";

Properties props = new Properties();
// Point to a Pulsar service
props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("group.id", "my-subscription-name");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", IntegerDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());

Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));

while (true) {
    ConsumerRecords<Integer, String> records = consumer.poll(100);
    records.forEach(record -> {
        log.info("Received record: {}", record);
    });

    // Commit last offset
    consumer.commitSync();
}

Complete Examples

You can find the complete producer and consumer examples [here]({{ site.pulsar_repo }}/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples).

Compatibility matrix

Currently the Pulsar Kafka wrapper supports most of the operations offered by the Kafka API.

Producer

APIs:

Producer MethodSupportedNotes
Future<RecordMetadata> send(ProducerRecord<K, V> record)YesCurrently no support for explicitly set the partition id when publishing
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)Yes
void flush()Yes
List<PartitionInfo> partitionsFor(String topic)No
Map<MetricName, ? extends Metric> metrics()No
void close()Yes
void close(long timeout, TimeUnit unit)Yes

Properties:

Config propertySupportedNotes
acksIgnoredDurability and quorum writes are configured at the namespace level
batch.sizeIgnored
block.on.buffer.fullYesIf true it will block producer, otherwise give error
bootstrap.serversYesNeeds to point to a single Pulsar service URL
buffer.memoryIgnored
client.idIgnored
compression.typeYesAllows gzip and lz4. No snappy.
connections.max.idle.msIgnored
interceptor.classesIgnored
key.serializerYes
linger.msYesControls the group commit time when batching messages
max.block.msIgnored
max.in.flight.requests.per.connectionIgnoredIn Pulsar ordering is maintained even with multiple requests in flight
max.request.sizeIgnored
metric.reportersIgnored
metrics.num.samplesIgnored
metrics.sample.window.msIgnored
partitioner.classIgnored
receive.buffer.bytesIgnored
reconnect.backoff.msIgnored
request.timeout.msIgnored
retriesIgnoredPulsar client retries with exponential backoff until the send timeout expires
send.buffer.bytesIgnored
timeout.msIgnored
value.serializerYes

Consumer

APIs:

Consumer MethodSupportedNotes
Set<TopicPartition> assignment()No
Set<String> subscription()Yes
void subscribe(Collection<String> topics)Yes
void subscribe(Collection<String> topics, ConsumerRebalanceListener callback)No
void assign(Collection<TopicPartition> partitions)No
void subscribe(Pattern pattern, ConsumerRebalanceListener callback)No
void unsubscribe()Yes
ConsumerRecords<K, V> poll(long timeoutMillis)Yes
void commitSync()Yes
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)Yes
void commitAsync()Yes
void commitAsync(OffsetCommitCallback callback)Yes
void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback)Yes
void seek(TopicPartition partition, long offset)No
void seekToBeginning(Collection<TopicPartition> partitions)No
void seekToEnd(Collection<TopicPartition> partitions)No
long position(TopicPartition partition)Yes
OffsetAndMetadata committed(TopicPartition partition)Yes
Map<MetricName, ? extends Metric> metrics()No
List<PartitionInfo> partitionsFor(String topic)No
Map<String, List<PartitionInfo>> listTopics()No
Set<TopicPartition> paused()No
void pause(Collection<TopicPartition> partitions)No
void resume(Collection<TopicPartition> partitions)No
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)No
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions)No
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions)No
void close()Yes
void close(long timeout, TimeUnit unit)Yes
void wakeup()No

Properties:

Config propertySupportedNotes
group.idYesMaps to a Pulsar subscription name
max.poll.recordsIgnored
max.poll.interval.msIgnoredMessages are “pushed” from broker
session.timeout.msIgnored
heartbeat.interval.msIgnored
bootstrap.serversYesNeeds to point to a single Pulsar service URL
enable.auto.commitYes
auto.commit.interval.msIgnoredWith auto-commit, acks are sent immediately to broker
partition.assignment.strategyIgnored
auto.offset.resetIgnored
fetch.min.bytesIgnored
fetch.max.bytesIgnored
fetch.max.wait.msIgnored
metadata.max.age.msIgnored
max.partition.fetch.bytesIgnored
send.buffer.bytesIgnored
receive.buffer.bytesIgnored
client.idIgnored

Custom Pulsar configurations

You can configure Pulsar authentication provider directly from the Kafka properties.

Properties:

Config propertyDefaultNotes
pulsar.authentication.classConfigure to auth provider. Eg. org.apache.pulsar.client.impl.auth.AuthenticationTls
pulsar.use.tlsfalseEnable TLS transport encryption
pulsar.tls.trust.certs.file.pathPath for the TLS trust certificate store
pulsar.tls.allow.insecure.connectionfalseAccept self-signed certificates from brokers