Add support of pulsar-kafka-adapter for kafka-0.9 api (#4886)

Fixes #4791

**Motivation**

Currently the Pulsar Kafka wrapper is using Kafka 0.10.x version. However, there are users who use legacy-kafka version in their system and willing to move to pulsar. This PR provides pulsar-kafka adapter for kafka-api-version 0.9.X. So, this adapter can help users in their migration process from kafka-0.9 to pulsar.
diff --git a/pulsar-client-kafka-compat/pom.xml b/pulsar-client-kafka-compat/pom.xml
index f25274e..569647f 100644
--- a/pulsar-client-kafka-compat/pom.xml
+++ b/pulsar-client-kafka-compat/pom.xml
@@ -38,7 +38,10 @@
 
   <modules>
     <module>pulsar-client-kafka</module>
+    <module>pulsar-client-kafka_0_9</module>
     <module>pulsar-client-kafka-shaded</module>
+    <module>pulsar-client-kafka-shaded_0_9</module>
     <module>pulsar-client-kafka-tests</module>
+    <module>pulsar-client-kafka-tests_0_9</module>
   </modules>
 </project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-shaded_0_9/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded_0_9/pom.xml
new file mode 100644
index 0000000..a6a7ba3
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded_0_9/pom.xml
@@ -0,0 +1,271 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+  xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-client-kafka-compat</artifactId>
+    <version>2.5.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>pulsar-client-kafka_0_9</artifactId>
+  <name>Pulsar Kafka compatibility 0.9 :: API</name>
+
+  <description>Drop-in replacement for Kafka client library that publishes and consumes
+  messages on Pulsar topics</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-kafka_0_9-original</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <createDependencyReducedPom>true</createDependencyReducedPom>
+              <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+              <artifactSet>
+                <includes>
+                  <include>org.apache.kafka:kafka-clients</include>
+                  <include>org.apache.pulsar:pulsar-client-kafka_0_9-original</include>
+                  <include>org.apache.pulsar:pulsar-client-original</include>
+                  <include>org.apache.commons:commons-lang3</include>
+                  <include>commons-codec:commons-codec</include>
+                  <include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
+                  <include>commons-collections:commons-collections</include>
+                  <include>org.asynchttpclient:*</include>
+                  <include>io.netty:netty-codec-http</include>
+                  <include>io.netty:netty-transport-native-epoll</include>
+                  <include>org.reactivestreams:reactive-streams</include>
+                  <include>com.typesafe.netty:netty-reactive-streams</include>
+                  <include>org.javassist:javassist</include>
+                  <include>com.google.protobuf:protobuf-java</include>
+                  <include>com.google.guava:guava</include>
+                  <include>com.google.code.gson:gson</include>
+                  <include>com.fasterxml.jackson.core</include>
+                  <include>com.fasterxml.jackson.module</include>
+                  <include>com.fasterxml.jackson.dataformat</include>
+                  <include>io.netty:netty</include>
+                  <include>io.netty:netty-*</include>
+                  <include>org.apache.pulsar:pulsar-common</include>
+                  <include>org.apache.bookkeeper:circe-checksum</include>
+                  <include>com.yahoo.datasketches:sketches-core</include>
+                  <include>org.apache.httpcomponents:httpclient</include>
+                  <include>commons-logging:commons-logging</include>
+                  <include>org.apache.httpcomponents:httpcore</include>
+                  <include>org.eclipse.jetty:*</include>
+                  <include>com.yahoo.datasketches:*</include>
+                  <include>commons-*:*</include>
+                  <include>org.yaml:snakeyaml</include>
+                  <include>org.objenesis:*</include>
+
+                  <include>org.apache.avro:*</include>
+                  <!-- Avro transitive dependencies-->
+                  <include>org.codehaus.jackson:jackson-core-asl</include>
+                  <include>org.codehaus.jackson:jackson-mapper-asl</include>
+                  <include>com.thoughtworks.paranamer:paranamer</include>
+                  <include>org.xerial.snappy:snappy-java</include>
+                  <include>org.apache.commons:commons-compress</include>
+                  <include>org.tukaani:xz</include>
+                </includes>
+              </artifactSet>
+               <filters>
+                <filter>
+                   <artifact>commons-logging:commons-logging</artifact>
+                   <includes>
+                       <include>**</include>
+                   </includes>
+                </filter>
+              </filters>
+              <relocations>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.producer.KafkaProducer</pattern>
+                  <shadedPattern>org.apache.kafka.clients.producer.OriginalKafkaProducer</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.producer.PulsarKafkaProducer</pattern>
+                  <shadedPattern>org.apache.kafka.clients.producer.KafkaProducer</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.consumer.KafkaConsumer</pattern>
+                  <shadedPattern>org.apache.kafka.clients.consumer.OriginalKafkaConsumer</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.kafka.clients.consumer.PulsarKafkaConsumer</pattern>
+                  <shadedPattern>org.apache.kafka.clients.consumer.KafkaConsumer</shadedPattern>
+                </relocation>
+
+                <!-- General relocation rules for Pulsar client dependencies -->
+
+                <relocation>
+                  <pattern>org.asynchttpclient</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.asynchttpclient</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.commons</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.apache.commons</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.google</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.com.google</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.fasterxml.jackson</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.com.fasterxml.jackson</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>io.netty</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.io.netty</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.yahoo.datasketches</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.com.yahoo.datasketches</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.yahoo.sketches</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.com.yahoo.sketches</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.http</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.apache.http</shadedPattern>
+                </relocation>
+                <relocation>
+                   <pattern>org.eclipse.jetty</pattern>
+                   <shadedPattern>org.apache.pulsar.shade.org.eclipse</shadedPattern>
+                </relocation>
+                <relocation>
+                   <pattern>org.reactivestreams</pattern>
+                   <shadedPattern>org.apache.pulsar.shade.org.reactivestreams</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.typesafe</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.com.typesafe</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.yahoo.memory</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.com.yahoo.memory</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.objenesis</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.objenesis</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.yaml</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.yaml</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.avro</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.apache.avro</shadedPattern>
+                  <excludes>
+                    <exclude>org.apache.avro.reflect.AvroAlias</exclude>
+                    <exclude>org.apache.avro.reflect.AvroDefault</exclude>
+                    <exclude>org.apache.avro.reflect.AvroEncode</exclude>
+                    <exclude>org.apache.avro.reflect.AvroIgnore</exclude>
+                    <exclude>org.apache.avro.reflect.AvroMeta</exclude>
+                    <exclude>org.apache.avro.reflect.AvroName</exclude>
+                    <exclude>org.apache.avro.reflect.AvroSchema</exclude>
+                    <exclude>org.apache.avro.reflect.Nullable</exclude>
+                    <exclude>org.apache.avro.reflect.Stringable</exclude>
+                    <exclude>org.apache.avro.reflect.Union</exclude>
+                  </excludes>
+                </relocation>
+                <!--Avro transitive dependencies-->
+                <relocation>
+                  <pattern>org.codehaus.jackson</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.codehaus.jackson</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.thoughtworks.paranamer</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.com.thoughtworks.paranamer</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.xerial.snappy</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.xerial.snappy</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.commons</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.apache.commons</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.tukaani</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.tukaani</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.apache.bookkeeper</pattern>
+                  <shadedPattern>org.apache.pulsar.shade.org.apache.bookkeeper</shadedPattern>
+                </relocation>
+              </relocations>
+              <filters>
+                <filter>
+                  <artifact>org.apache.pulsar:pulsar-client-original</artifact>
+                  <includes>
+                    <include>**</include>
+                  </includes>
+                </filter>
+              </filters>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <!-- This plugin is used to run a script after the package phase in order to rename
+            libnetty_transport_native_epoll_x86_64.so from Netty into
+            liborg_apache_pulsar_shade_netty_transport_native_epoll_x86_64.so
+            to reflect the shade that is being applied.
+         -->
+        <artifactId>exec-maven-plugin</artifactId>
+        <groupId>org.codehaus.mojo</groupId>
+        <executions>
+          <execution>
+            <id>rename-epoll-library</id>
+            <phase>package</phase>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+            <configuration>
+              <executable>${project.parent.basedir}/../src/${rename.netty.native.libs}</executable>
+              <arguments>
+                <argument>${project.artifactId}</argument>
+              </arguments>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
index aa5e29a..5d9fcfc 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
@@ -61,6 +61,7 @@
             log.info("Message {} sent successfully", i);
         }
 
+        producer.flush();
         producer.close();
     }
 
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
index a95413c..34e008e 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
@@ -45,6 +45,7 @@
             log.info("Message {} sent successfully", i);
         }
 
+        producer.flush();
         producer.close();
     }
 
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/pom.xml
new file mode 100644
index 0000000..c63c8dc
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/pom.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+  xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-client-kafka-compat</artifactId>
+    <version>2.5.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <artifactId>pulsar-client-kafka_0_9-tests</artifactId>
+  <name>Pulsar Kafka compatibility 0.9 :: Tests</name>
+
+  <description>Tests to verify the correct shading configuration for the pulsar-client-kafka wrapper</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-kafka_0_9</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-broker</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-broker</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>managed-ledger-original</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerAvroExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerAvroExample.java
new file mode 100644
index 0000000..3e39b8d
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerAvroExample.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Bar;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Foo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+public class ConsumerAvroExample {
+
+    public static void main(String[] args) {
+        String topic = "persistent://public/default/test-avro";
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", "pulsar://localhost:6650");
+        props.put("group.id", "my-subscription-name");
+        props.put("enable.auto.commit", "false");
+        props.put("key.deserializer", IntegerDeserializer.class.getName());
+        props.put("value.deserializer", StringDeserializer.class.getName());
+
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+
+        Bar bar = new Bar();
+        bar.setField1(true);
+
+        Foo foo = new Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+
+        @SuppressWarnings("resource")
+        Consumer<Foo, Bar> consumer = new KafkaConsumer<>(props, fooSchema, barSchema);
+        consumer.subscribe(Arrays.asList(topic));
+
+        while (true) {
+            ConsumerRecords<Foo, Bar> records = consumer.poll(100);
+            records.forEach(record -> {
+                log.info("Received record: {}", record);
+            });
+
+            // Commit last offset
+            consumer.commitSync();
+        }
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(ConsumerExample.class);
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerExample.java
new file mode 100644
index 0000000..983d7b7
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerExample.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsumerExample {
+
+    public static void main(String[] args) {
+        String topic = "persistent://public/default/test";
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", "pulsar://localhost:6650");
+        props.put("group.id", "my-subscription-name");
+        props.put("enable.auto.commit", "false");
+        props.put("key.deserializer", IntegerDeserializer.class.getName());
+        props.put("value.deserializer", StringDeserializer.class.getName());
+
+        @SuppressWarnings("resource")
+        Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
+        consumer.subscribe(Arrays.asList(topic));
+
+        while (true) {
+            ConsumerRecords<Integer, String> records = consumer.poll(100);
+            records.forEach(record -> {
+                log.info("Received record: {}", record);
+            });
+
+            // Commit last offset
+            consumer.commitSync();
+        }
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(ConsumerExample.class);
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
new file mode 100644
index 0000000..5d9fcfc
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Bar;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Foo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+public class ProducerAvroExample {
+    public static void main(String[] args) {
+        String topic = "persistent://public/default/test-avro";
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", "pulsar://localhost:6650");
+
+        props.put("key.serializer", IntegerSerializer.class.getName());
+        props.put("value.serializer", StringSerializer.class.getName());
+
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+
+        Bar bar = new Bar();
+        bar.setField1(true);
+
+        Foo foo = new Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+
+
+        Producer<Foo, Bar> producer = new KafkaProducer<>(props, fooSchema, barSchema);
+
+        for (int i = 0; i < 10; i++) {
+            producer.send(new ProducerRecord<Foo, Bar>(topic, i, foo, bar));
+            log.info("Message {} sent successfully", i);
+        }
+
+        producer.flush();
+        producer.close();
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(ProducerExample.class);
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
new file mode 100644
index 0000000..f089b26
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProducerExample {
+    public static void main(String[] args) {
+        String topic = "persistent://public/default/test";
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", "pulsar://localhost:6650");
+        props.put("key.serializer", IntegerSerializer.class.getName());
+        props.put("value.serializer", StringSerializer.class.getName());
+
+        Producer<Integer, String> producer = new KafkaProducer<>(props);
+
+        for (int i = 0; i < 10; i++) {
+            producer.send(new ProducerRecord<Integer, String>(topic, i, Integer.toString(i)));
+            log.info("Message {} sent successfully", i);
+        }
+
+        producer.flush();
+        producer.close();
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(ProducerExample.class);
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Bar.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Bar.java
new file mode 100644
index 0000000..8120900
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Bar.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples.utils;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+@Data
+@ToString
+@EqualsAndHashCode
+public class Bar {
+    private boolean field1;
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Foo.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Foo.java
new file mode 100644
index 0000000..d584f51
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Foo.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples.utils;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.avro.reflect.Nullable;
+
+@Data
+@ToString
+@EqualsAndHashCode
+public class Foo {
+    @Nullable
+    private String field1;
+    @Nullable
+    private String field2;
+    private int field3;
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 15e23a2..06b1c6e 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -60,7 +60,7 @@
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
+import org.apache.pulsar.client.util.MessageIdUtils;
 import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
 import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
 import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 14dd78b..cacca60 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -57,7 +57,7 @@
 import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
 import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
 import org.apache.pulsar.client.kafka.compat.KafkaProducerInterceptorWrapper;
-import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
+import org.apache.pulsar.client.util.MessageIdUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/MessageIdUtils.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/MessageIdUtils.java
deleted file mode 100644
index d8f7680..0000000
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/MessageIdUtils.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.kafka.compat;
-
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.impl.MessageIdImpl;
-
-public class MessageIdUtils {
-    public static final long getOffset(MessageId messageId) {
-        MessageIdImpl msgId = (MessageIdImpl) messageId;
-        long ledgerId = msgId.getLedgerId();
-        long entryId = msgId.getEntryId();
-
-        // Combine ledger id and entry id to form offset
-        // Use less than 32 bits to represent entry id since it will get
-        // rolled over way before overflowing the max int range
-        long offset = (ledgerId << 28) | entryId;
-        return offset;
-    }
-
-    public static final MessageId getMessageId(long offset) {
-        // Demultiplex ledgerId and entryId from offset
-        long ledgerId = offset >>> 28;
-        long entryId = offset & 0x0F_FF_FF_FFL;
-
-        return new MessageIdImpl(ledgerId, entryId, -1);
-    }
-}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/pom.xml
new file mode 100644
index 0000000..0740e78
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+  xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-client-kafka-compat</artifactId>
+    <version>2.5.0-SNAPSHOT</version>
+    <relativePath>..</relativePath>
+  </parent>
+
+  <properties>
+    <kafka_0_9.version>0.9.0.1</kafka_0_9.version>
+  </properties>
+
+  <artifactId>pulsar-client-kafka_0_9-original</artifactId>
+  <name>Pulsar Kafka compatibility 0.9 :: API (original)</name>
+
+  <description>Kafka client library that publishes and consumes messages on Pulsar topics</description>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka_0_9.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>net.jpountz.lz4</groupId>
+          <artifactId>lz4</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.lz4</groupId>
+          <artifactId>lz4-java</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.xerial.snappy</groupId>
+          <artifactId>snappy-java</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+
+</project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
new file mode 100644
index 0000000..6d3c383
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -0,0 +1,597 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener<byte[]> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final PulsarClient client;
+
+    private final Schema<K> keySchema;
+    private final Schema<V> valueSchema;
+
+    private final String groupId;
+    private final boolean isAutoCommit;
+
+    private final ConcurrentMap<TopicPartition, org.apache.pulsar.client.api.Consumer<byte[]>> consumers = new ConcurrentHashMap<>();
+
+    private final Map<TopicPartition, Long> lastReceivedOffset = new ConcurrentHashMap<>();
+    private final Map<TopicPartition, OffsetAndMetadata> lastCommittedOffset = new ConcurrentHashMap<>();
+    private final Set<TopicPartition> unpolledPartitions = new HashSet<>();
+    private final SubscriptionInitialPosition strategy;
+
+    private volatile boolean closed = false;
+
+    private final int maxRecordsInSinglePoll;
+
+    private final Properties properties;
+
+
+    private static class QueueItem {
+        final org.apache.pulsar.client.api.Consumer<byte[]> consumer;
+        final Message<byte[]> message;
+
+        QueueItem(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> message) {
+            this.consumer = consumer;
+            this.message = message;
+        }
+    }
+
+    // Since a single Kafka consumer can receive from multiple topics, we need to multiplex all the different
+    // topics/partitions into a single queues
+    private final BlockingQueue<QueueItem> receivedMessages = new ArrayBlockingQueue<>(1000);
+
+    public PulsarKafkaConsumer(Map<String, Object> configs) {
+        this(new ConsumerConfig(configs), null, null);
+    }
+
+    public PulsarKafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer,
+                               Deserializer<V> valueDeserializer) {
+        this(new ConsumerConfig(configs),
+                new PulsarKafkaSchema<>(keyDeserializer), new PulsarKafkaSchema<>(valueDeserializer));
+    }
+
+    public PulsarKafkaConsumer(Map<String, Object> configs, Schema<K> keySchema, Schema<V> valueSchema) {
+        this(new ConsumerConfig(configs), keySchema, valueSchema);
+    }
+
+    public PulsarKafkaConsumer(Properties properties) {
+        this(new ConsumerConfig(properties), null, null);
+    }
+
+    public PulsarKafkaConsumer(Properties properties, Deserializer<K> keyDeserializer,
+                               Deserializer<V> valueDeserializer) {
+        this(new ConsumerConfig(properties),
+                new PulsarKafkaSchema<>(keyDeserializer), new PulsarKafkaSchema<>(valueDeserializer));
+    }
+
+    public PulsarKafkaConsumer(Properties properties, Schema<K> keySchema, Schema<V> valueSchema) {
+        this(new ConsumerConfig(properties), keySchema, valueSchema);
+    }
+
+    @SuppressWarnings("unchecked")
+    private PulsarKafkaConsumer(ConsumerConfig consumerConfig, Schema<K> keySchema, Schema<V> valueSchema) {
+
+        if (keySchema == null) {
+            Deserializer<K> kafkaKeyDeserializer = consumerConfig.getConfiguredInstance(
+                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+            kafkaKeyDeserializer.configure(consumerConfig.originals(), true);
+            this.keySchema = new PulsarKafkaSchema<>(kafkaKeyDeserializer);
+        } else {
+            this.keySchema = keySchema;
+            consumerConfig.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+        }
+
+        if (valueSchema == null) {
+            Deserializer<V> kafkaValueDeserializer = consumerConfig.getConfiguredInstance(
+                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+            kafkaValueDeserializer.configure(consumerConfig.originals(), true);
+            this.valueSchema = new PulsarKafkaSchema<>(kafkaValueDeserializer);
+        } else {
+            this.valueSchema = valueSchema;
+            consumerConfig.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+        }
+
+        groupId = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG);
+        isAutoCommit = consumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+        strategy = getStrategy(consumerConfig.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+        log.info("Offset reset strategy has been assigned value {}", strategy);
+
+        String serviceUrl = consumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
+
+        // there is not this config in kafka 0.9, so use default value.
+        maxRecordsInSinglePoll = 1000;
+
+        this.properties = new Properties();
+        consumerConfig.originals().forEach(properties::put);
+        ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
+        // Since this client instance is going to be used just for the consumers, we can enable Nagle to group
+        // all the acknowledgments sent to broker within a short time frame
+        clientBuilder.enableTcpNoDelay(false);
+        try {
+            client = clientBuilder.serviceUrl(serviceUrl).build();
+        } catch (PulsarClientException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private SubscriptionInitialPosition getStrategy(final String strategy) {
+        if ("earliest".equals(strategy)) {
+            return SubscriptionInitialPosition.Earliest;
+        } else {
+            return SubscriptionInitialPosition.Latest;
+        }
+    }
+
+    @Override
+    public Set<TopicPartition> assignment() {
+        throw new UnsupportedOperationException("Cannot access the partitions assignements");
+    }
+
+    @Override
+    public Set<String> subscription() {
+        return consumers.keySet().stream().map(TopicPartition::topic).collect(Collectors.toSet());
+    }
+
+    @Override
+    public void subscribe(List<String> topics) {
+        subscribe(topics, null);
+    }
+
+    @Override
+    public void subscribe(List<String> topics, ConsumerRebalanceListener callback) {
+        List<CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>>> futures = new ArrayList<>();
+
+        List<TopicPartition> topicPartitions = new ArrayList<>();
+        try {
+            for (String topic : topics) {
+                // Create individual subscription on each partition, that way we can keep using the
+                // acknowledgeCumulative()
+                int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get();
+
+                ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(client, properties);
+                consumerBuilder.subscriptionType(SubscriptionType.Failover);
+                consumerBuilder.messageListener(this);
+                consumerBuilder.subscriptionName(groupId);
+                consumerBuilder.topics(topics);
+                if (numberOfPartitions > 1) {
+                    // Subscribe to each partition
+                    consumerBuilder.consumerName(ConsumerName.generateRandomName());
+                    for (int i = 0; i < numberOfPartitions; i++) {
+                        String partitionName = TopicName.get(topic).getPartition(i).toString();
+                        CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
+                                .topic(partitionName).subscribeAsync();
+                        int partitionIndex = i;
+                        TopicPartition tp = new TopicPartition(
+                                TopicName.get(topic).getPartitionedTopicName(),
+                                partitionIndex);
+                        futures.add(future.thenApply(consumer -> {
+                            log.info("Add consumer {} for partition {}", consumer, tp);
+                            consumers.putIfAbsent(tp, consumer);
+                            return consumer;
+                        }));
+                        topicPartitions.add(tp);
+                    }
+                } else {
+                    // Topic has a single partition
+                    CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
+                            .subscribeAsync();
+                    TopicPartition tp = new TopicPartition(
+                            TopicName.get(topic).getPartitionedTopicName(),
+                            0);
+                    futures.add(future.thenApply(consumer -> {
+                        log.info("Add consumer {} for partition {}", consumer, tp);
+                        consumers.putIfAbsent(tp, consumer);
+                        return consumer;
+                    }));
+                    topicPartitions.add(tp);
+                }
+            }
+            unpolledPartitions.addAll(topicPartitions);
+
+            // Wait for all consumers to be ready
+            futures.forEach(CompletableFuture::join);
+
+            // Notify the listener is now owning all topics/partitions
+            if (callback != null) {
+                callback.onPartitionsAssigned(topicPartitions);
+            }
+
+        } catch (Exception e) {
+            // Close all consumer that might have been successfully created
+            futures.forEach(f -> {
+                try {
+                    f.get().close();
+                } catch (Exception e1) {
+                    // Ignore. Consumer already had failed
+                }
+            });
+
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void assign(List<TopicPartition> list) {
+        throw new UnsupportedOperationException("Cannot manually assign partitions");
+    }
+
+    @Override
+    public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
+        throw new UnsupportedOperationException("Cannot subscribe with topic name pattern");
+    }
+
+    @Override
+    public void unsubscribe() {
+        consumers.values().forEach(c -> {
+            try {
+                c.unsubscribe();
+            } catch (PulsarClientException e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    @Override
+    public ConsumerRecords<K, V> poll(long timeoutMillis) {
+        try {
+            QueueItem item = receivedMessages.poll(timeoutMillis, TimeUnit.MILLISECONDS);
+            if (item == null) {
+                return (ConsumerRecords<K, V>) ConsumerRecords.EMPTY;
+            }
+
+            Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
+
+            int numberOfRecords = 0;
+
+            while (item != null) {
+                TopicName topicName = TopicName.get(item.consumer.getTopic());
+                String topic = topicName.getPartitionedTopicName();
+                int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
+                Message<byte[]> msg = item.message;
+                MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+                long offset = MessageIdUtils.getOffset(msgId);
+
+                TopicPartition tp = new TopicPartition(topic, partition);
+                if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp)) {
+                    log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp);
+                    resetOffsets(tp);
+                }
+
+                K key = getKey(topic, msg);
+                if (valueSchema instanceof PulsarKafkaSchema) {
+                    ((PulsarKafkaSchema<V>) valueSchema).setTopic(topic);
+                }
+                V value = valueSchema.decode(msg.getData());
+
+                ConsumerRecord<K, V> consumerRecord = new ConsumerRecord<>(topic, partition, offset, key, value);
+
+                records.computeIfAbsent(tp, k -> new ArrayList<>()).add(consumerRecord);
+
+                // Update last offset seen by application
+                lastReceivedOffset.put(tp, offset);
+                unpolledPartitions.remove(tp);
+
+                if (++numberOfRecords >= maxRecordsInSinglePoll) {
+                    break;
+                }
+
+                // Check if we have an item already available
+                item = receivedMessages.poll(0, TimeUnit.MILLISECONDS);
+            }
+
+            if (isAutoCommit && !records.isEmpty()) {
+                // Commit the offset of previously dequeued messages
+                commitAsync();
+            }
+
+            return new ConsumerRecords<>(records);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void commitSync() {
+        try {
+            doCommitOffsets(getCurrentOffsetsMap()).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        try {
+            doCommitOffsets(offsets).get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void commitAsync() {
+        doCommitOffsets(getCurrentOffsetsMap());
+    }
+
+    @Override
+    public void commitAsync(OffsetCommitCallback callback) {
+        Map<TopicPartition, OffsetAndMetadata> offsets = getCurrentOffsetsMap();
+        doCommitOffsets(offsets).handle((v, throwable) -> {
+            callback.onComplete(offsets, throwable != null ? new Exception(throwable) : null);
+            return null;
+        });
+    }
+
+    @Override
+    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
+        doCommitOffsets(offsets).handle((v, throwable) -> {
+            callback.onComplete(offsets, throwable != null ? new Exception(throwable) : null);
+            return null;
+        });
+    }
+
+    private CompletableFuture<Void> doCommitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        offsets.forEach((topicPartition, offsetAndMetadata) -> {
+            org.apache.pulsar.client.api.Consumer<byte[]> consumer = consumers.get(topicPartition);
+            lastCommittedOffset.put(topicPartition, offsetAndMetadata);
+            futures.add(consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId(offsetAndMetadata.offset())));
+        });
+
+        return FutureUtil.waitForAll(futures);
+    }
+
+    private Map<TopicPartition, OffsetAndMetadata> getCurrentOffsetsMap() {
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        lastReceivedOffset.forEach((topicPartition, offset) -> {
+            OffsetAndMetadata om = new OffsetAndMetadata(offset);
+            offsets.put(topicPartition, om);
+        });
+
+        return offsets;
+    }
+
+    @Override
+    public void seek(TopicPartition partition, long offset) {
+        MessageId msgId = MessageIdUtils.getMessageId(offset);
+        org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(partition);
+        if (c == null) {
+            throw new IllegalArgumentException("Cannot seek on a partition where we are not subscribed");
+        }
+
+        try {
+            c.seek(msgId);
+        } catch (PulsarClientException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void seekToBeginning(TopicPartition... partitions) {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        if (partitions.length == 0) {
+            partitions = consumers.keySet().toArray(new TopicPartition[0]);
+        }
+        lastCommittedOffset.clear();
+        lastReceivedOffset.clear();
+
+        for (TopicPartition tp : partitions) {
+            org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
+            if (c == null) {
+                futures.add(FutureUtil.failedFuture(
+                        new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
+            } else {
+                futures.add(c.seekAsync(MessageId.earliest));
+            }
+        }
+
+        FutureUtil.waitForAll(futures).join();
+    }
+
+    @Override
+    public void seekToEnd(TopicPartition... partitions) {
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+        if (partitions.length == 0) {
+            partitions = consumers.keySet().toArray(new TopicPartition[0]);
+        }
+        lastCommittedOffset.clear();
+        lastReceivedOffset.clear();
+
+        for (TopicPartition tp : partitions) {
+            org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
+            if (c == null) {
+                futures.add(FutureUtil.failedFuture(
+                        new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
+            } else {
+                futures.add(c.seekAsync(MessageId.latest));
+            }
+        }
+
+        FutureUtil.waitForAll(futures).join();
+    }
+
+    @Override
+    public long position(TopicPartition partition) {
+        Long offset = lastReceivedOffset.get(partition);
+        if (offset == null && !unpolledPartitions.contains(partition)) {
+            return resetOffsets(partition).getValue();
+        }
+        return unpolledPartitions.contains(partition) ? 0 : offset;
+    }
+
+    @Override
+    public OffsetAndMetadata committed(TopicPartition partition) {
+        return lastCommittedOffset.get(partition);
+    }
+
+    @Override
+    public Map<MetricName, ? extends Metric> metrics() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public List<PartitionInfo> partitionsFor(String s) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Map<String, List<PartitionInfo>> listTopics() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void pause(TopicPartition... topicPartitions) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void resume(TopicPartition... topicPartitions) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void close() {
+        try {
+            closed = true;
+
+            if (isAutoCommit) {
+                commitAsync();
+            }
+
+            client.closeAsync().get(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void wakeup() {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * This method is called whenever a new message is received.
+     * <p>
+     * Messages are guaranteed to be delivered in order and from the same thread for a single consumer
+     * <p>
+     * This method will only be called once for each message, unless either application or broker crashes.
+     * <p>
+     * Application is responsible of handling any exception that could be thrown while processing the message.
+     *
+     * @param consumer the consumer that received the message
+     * @param msg
+     */
+    @Override
+    public void received(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> msg) {
+        // Block listener thread if the application is slowing down
+        try {
+            receivedMessages.put(new QueueItem(consumer, msg));
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            if (closed) {
+                // Consumer was closed and the thread was interrupted. Nothing to worry about here
+            } else {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private SubscriptionInitialPosition resetOffsets(final TopicPartition partition) {
+        log.info("Resetting partition {} and seeking to {} position", partition, strategy);
+        if (strategy == SubscriptionInitialPosition.Earliest) {
+            seekToBeginning(partition);
+        } else {
+            seekToEnd(partition);
+        }
+        return strategy;
+    }
+
+    @SuppressWarnings("unchecked")
+    private K getKey(String topic, Message<byte[]> msg) {
+        if (!msg.hasKey()) {
+            return null;
+        }
+
+        if (keySchema instanceof PulsarKafkaSchema) {
+            PulsarKafkaSchema<K> pulsarKafkaSchema = (PulsarKafkaSchema) keySchema;
+            Deserializer<K> kafkaDeserializer = pulsarKafkaSchema.getKafkaDeserializer();
+            if (kafkaDeserializer instanceof StringDeserializer) {
+                return (K) msg.getKey();
+            }
+            pulsarKafkaSchema.setTopic(topic);
+        }
+        // Assume base64 encoding
+        byte[] data = Base64.getDecoder().decode(msg.getKey());
+        return keySchema.decode(data);
+
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
new file mode 100644
index 0000000..b853459
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
+import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
+import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
+
+    private final PulsarClient client;
+    private final ProducerBuilder<byte[]> pulsarProducerBuilder;
+
+    private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer<byte[]>> producers = new ConcurrentHashMap<>();
+
+    private final Schema<K> keySchema;
+    private final Schema<V> valueSchema;
+
+    private final Partitioner partitioner;
+    private volatile Cluster cluster = Cluster.empty();
+
+    private final Properties properties;
+
+    private static final Logger logger = LoggerFactory.getLogger(PulsarKafkaProducer.class);
+
+    public PulsarKafkaProducer(Map<String, Object> configs) {
+        this(new ProducerConfig(configs), null, null);
+    }
+
+    public PulsarKafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer,
+                               Serializer<V> valueSerializer) {
+        this(new ProducerConfig(configs), new PulsarKafkaSchema<>(keySerializer), new PulsarKafkaSchema<>(valueSerializer));
+    }
+
+    public PulsarKafkaProducer(Map<String, Object> configs, Schema<K> keySchema, Schema<V> valueSchema) {
+        this(new ProducerConfig(configs), keySchema, valueSchema);
+    }
+
+    public PulsarKafkaProducer(Properties properties) {
+        this(new ProducerConfig(properties), null, null);
+    }
+
+    public PulsarKafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+        this(new ProducerConfig(properties), new PulsarKafkaSchema<>(keySerializer), new PulsarKafkaSchema<>(valueSerializer));
+    }
+
+    public PulsarKafkaProducer(Properties properties, Schema<K> keySchema, Schema<V> valueSchema) {
+        this(new ProducerConfig(properties), keySchema, valueSchema);
+    }
+
+    @SuppressWarnings({ "unchecked", "deprecation" })
+    private PulsarKafkaProducer(ProducerConfig producerConfig, Schema<K> keySchema, Schema<V> valueSchema) {
+
+        if (keySchema == null) {
+            Serializer<K> kafkaKeySerializer = producerConfig.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
+            kafkaKeySerializer.configure(producerConfig.originals(), true);
+            this.keySchema = new PulsarKafkaSchema<>(kafkaKeySerializer);
+        } else {
+            this.keySchema = keySchema;
+            producerConfig.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+        }
+
+        if (valueSchema == null) {
+            Serializer<V> kafkaValueSerializer = producerConfig.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
+            kafkaValueSerializer.configure(producerConfig.originals(), false);
+            this.valueSchema = new PulsarKafkaSchema<>(kafkaValueSerializer);
+        } else {
+            this.valueSchema = valueSchema;
+            producerConfig.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+        }
+
+        partitioner = producerConfig.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
+        partitioner.configure(producerConfig.originals());
+
+        this.properties = new Properties();
+        producerConfig.originals().forEach(properties::put);
+
+        long keepAliveIntervalMs = Long.parseLong(properties.getProperty(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "30000"));
+
+        String serviceUrl = producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
+        try {
+            // Support Kafka's ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG in ms.
+            // If passed in value is greater than Integer.MAX_VALUE in second will throw IllegalArgumentException.
+            int keepAliveInterval = Math.toIntExact(keepAliveIntervalMs / 1000);
+            client = PulsarClientKafkaConfig.getClientBuilder(properties)
+                                            .serviceUrl(serviceUrl)
+                                            .keepAliveInterval(keepAliveInterval, TimeUnit.SECONDS)
+                                            .build();
+        } catch (ArithmeticException e) {
+            String errorMessage = String.format("Invalid value %d for 'connections.max.idle.ms'. " +
+                    "Please use a value smaller than %d000 milliseconds.", keepAliveIntervalMs, Integer.MAX_VALUE);
+            logger.error(errorMessage);
+            throw new IllegalArgumentException(errorMessage);
+        } catch (PulsarClientException e) {
+            throw new RuntimeException(e);
+        }
+
+        pulsarProducerBuilder = PulsarProducerKafkaConfig.getProducerBuilder(client, properties);
+
+        // To mimic the same batching mode as Kafka, we need to wait a very little amount of
+        // time to batch if the client is trying to send messages fast enough
+        long lingerMs = Long.parseLong(properties.getProperty(ProducerConfig.LINGER_MS_CONFIG, "1"));
+        pulsarProducerBuilder.batchingMaxPublishDelay(lingerMs, TimeUnit.MILLISECONDS);
+
+        String compressionType = properties.getProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG);
+        if ("gzip".equals(compressionType)) {
+            pulsarProducerBuilder.compressionType(CompressionType.ZLIB);
+        } else if ("lz4".equals(compressionType)) {
+            pulsarProducerBuilder.compressionType(CompressionType.LZ4);
+        }
+
+        pulsarProducerBuilder.messageRouter(new KafkaMessageRouter(lingerMs));
+
+        int sendTimeoutMillis = Integer.parseInt(properties.getProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"));
+        pulsarProducerBuilder.sendTimeout(sendTimeoutMillis, TimeUnit.MILLISECONDS);
+
+        // Kafka blocking semantic when blockOnBufferFull=false is different from Pulsar client
+        // Pulsar throws error immediately when the queue is full and blockIfQueueFull=false
+        // Kafka, on the other hand, still blocks for "max.block.ms" time and then gives error.
+        boolean shouldBlockPulsarProducer = sendTimeoutMillis > 0;
+        pulsarProducerBuilder.blockIfQueueFull(shouldBlockPulsarProducer);
+    }
+
+
+    @Override
+    public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
+        return send(producerRecord, null);
+    }
+
+    @Override
+    public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord, Callback callback) {
+        org.apache.pulsar.client.api.Producer<byte[]> producer;
+
+        try {
+            producer = producers.computeIfAbsent(producerRecord.topic(), topic -> createNewProducer(topic));
+        } catch (Exception e) {
+            if (callback != null) {
+                callback.onCompletion(null, e);
+            }
+            CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
+            future.completeExceptionally(e);
+            return future;
+        }
+
+        TypedMessageBuilder<byte[]> messageBuilder = buildMessage(producer, producerRecord);
+
+        CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
+        messageBuilder.sendAsync().thenAccept((messageId) -> {
+            future.complete(getRecordMetadata(producerRecord.topic(), messageId));
+        }).exceptionally(ex -> {
+            future.completeExceptionally(ex);
+            return null;
+        });
+
+        future.handle((recordMetadata, throwable) -> {
+            if (callback != null) {
+                Exception exception = throwable != null ? new Exception(throwable) : null;
+                callback.onCompletion(recordMetadata, exception);
+            }
+            return null;
+        });
+
+        return future;
+    }
+
+    @Override
+    public void flush() {
+        producers.values().stream()
+                .map(p -> p.flushAsync())
+                .collect(Collectors.toList())
+                .forEach(CompletableFuture::join);
+    }
+
+    @Override
+    public List<PartitionInfo> partitionsFor(String s) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Map<MetricName, ? extends Metric> metrics() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public void close() {
+        close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        partitioner.close();
+    }
+
+    @Override
+    public void close(long timeout, TimeUnit unit) {
+        try {
+            client.closeAsync().get(timeout, unit);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private org.apache.pulsar.client.api.Producer<byte[]> createNewProducer(String topic) {
+        try {
+            // Add the partitions info for the new topic
+            cluster = addPartitionsInfo(cluster, topic);
+            return pulsarProducerBuilder.clone()
+                    .topic(topic)
+                    .create();
+        } catch (PulsarClientException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Add the partitions info for the new topic.
+     * Need to ensure the atomicity of the update operation.
+     */
+    private synchronized Cluster addPartitionsInfo(Cluster cluster, String topic) {
+        List<String> partitions = client.getPartitionsForTopic(topic).join();
+        // read partitions info
+        Set<PartitionInfo> partitionsInfo = new HashSet<>();
+        for (int i = 0; i < partitions.size(); i++) {
+            partitionsInfo.add(new PartitionInfo(topic, i, null, null, null));
+        }
+        // create cluster with new partitions info
+        Set<PartitionInfo> combinedPartitions = new HashSet<>();
+        if (cluster.partitionsForTopic(topic) != null) {
+            combinedPartitions.addAll(cluster.partitionsForTopic(topic));
+        }
+        combinedPartitions.addAll(partitionsInfo);
+        return new Cluster(cluster.nodes(), combinedPartitions, new HashSet(cluster.unauthorizedTopics()));
+    }
+
+    private TypedMessageBuilder<byte[]> buildMessage(org.apache.pulsar.client.api.Producer<byte[]> producer, ProducerRecord<K, V> record) {
+        TypedMessageBuilder<byte[]> builder = producer.newMessage();
+
+        byte[] keyBytes = null;
+        if (record.key() != null) {
+            String key = getKey(record.topic(), record.key());
+            keyBytes = key.getBytes(StandardCharsets.UTF_8);
+            builder.key(key);
+        }
+
+        if (valueSchema instanceof PulsarKafkaSchema) {
+            ((PulsarKafkaSchema<V>) valueSchema).setTopic(record.topic());
+        }
+        byte[] value = valueSchema.encode(record.value());
+        builder.value(value);
+
+        if (record.partition() != null) {
+            // Partition was explicitly set on the record
+            builder.property(KafkaMessageRouter.PARTITION_ID, record.partition().toString());
+        } else {
+            // Get the partition id from the partitioner
+            int partition = partitioner.partition(record.topic(), record.key(), keyBytes, record.value(), value, cluster);
+            builder.property(KafkaMessageRouter.PARTITION_ID, Integer.toString(partition));
+        }
+        return builder;
+    }
+
+    private String getKey(String topic, K key) {
+        // If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
+        if (key instanceof String) {
+            return (String) key;
+        }
+        if (keySchema instanceof PulsarKafkaSchema) {
+            ((PulsarKafkaSchema) keySchema).setTopic(topic);
+        }
+        byte[] keyBytes = keySchema.encode(key);
+        return Base64.getEncoder().encodeToString(keyBytes);
+    }
+
+    private RecordMetadata getRecordMetadata(String topic, MessageId messageId) {
+        MessageIdImpl msgId = (MessageIdImpl) messageId;
+
+        // Combine ledger id and entry id to form offset
+        long offset = MessageIdUtils.getOffset(msgId);
+        int partition = msgId.getPartitionIndex();
+
+        TopicPartition tp = new TopicPartition(topic, partition);
+        return new RecordMetadata(tp, offset, 0L);
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaMessageRouter.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaMessageRouter.java
new file mode 100644
index 0000000..d3fb915
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaMessageRouter.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.impl.RoundRobinPartitionMessageRouterImpl;
+
+public class KafkaMessageRouter extends RoundRobinPartitionMessageRouterImpl {
+
+    public static final String PARTITION_ID = "pulsar.partition.id";
+
+    public KafkaMessageRouter(long maxBatchingDelayMs) {
+        super(HashingScheme.JavaStringHash, ThreadLocalRandom.current().nextInt(), true, maxBatchingDelayMs);
+    }
+
+    @Override
+    public int choosePartition(Message<?> msg, TopicMetadata metadata) {
+        if (msg.hasProperty(PARTITION_ID)) {
+            return Integer.parseInt(msg.getProperty(PARTITION_ID));
+        } else {
+            return super.choosePartition(msg, metadata);
+        }
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
new file mode 100644
index 0000000..c38d93d
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+
+public class PulsarClientKafkaConfig {
+
+    /// Config variables
+    public static final String AUTHENTICATION_CLASS = "pulsar.authentication.class";
+    public static final String AUTHENTICATION_PARAMS_MAP = "pulsar.authentication.params.map";
+    public static final String AUTHENTICATION_PARAMS_STRING = "pulsar.authentication.params.string";
+    public static final String USE_TLS = "pulsar.use.tls";
+    public static final String TLS_TRUST_CERTS_FILE_PATH = "pulsar.tls.trust.certs.file.path";
+    public static final String TLS_ALLOW_INSECURE_CONNECTION = "pulsar.tls.allow.insecure.connection";
+    public static final String TLS_HOSTNAME_VERIFICATION = "pulsar.tls.hostname.verification";
+
+    public static final String OPERATION_TIMEOUT_MS = "pulsar.operation.timeout.ms";
+    public static final String STATS_INTERVAL_SECONDS = "pulsar.stats.interval.seconds";
+    public static final String NUM_IO_THREADS = "pulsar.num.io.threads";
+
+    public static final String CONNECTIONS_PER_BROKER = "pulsar.connections.per.broker";
+
+    public static final String USE_TCP_NODELAY = "pulsar.use.tcp.nodelay";
+
+    public static final String CONCURRENT_LOOKUP_REQUESTS = "pulsar.concurrent.lookup.requests";
+    public static final String MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION = "pulsar.max.number.rejected.request.per.connection";
+
+    public static ClientBuilder getClientBuilder(Properties properties) {
+        ClientBuilder clientBuilder = PulsarClient.builder();
+
+        if (properties.containsKey(AUTHENTICATION_CLASS)) {
+            String className = properties.getProperty(AUTHENTICATION_CLASS);
+            try {
+                if (properties.containsKey(AUTHENTICATION_PARAMS_STRING)) {
+                    String authParamsString = (String) properties.get(AUTHENTICATION_PARAMS_STRING);
+                    clientBuilder.authentication(className, authParamsString);
+                } else if (properties.containsKey(AUTHENTICATION_PARAMS_MAP)) {
+                    Map<String, String> authParams = (Map<String, String>) properties.get(AUTHENTICATION_PARAMS_MAP);
+                    clientBuilder.authentication(className, authParams);
+                } else {
+                    @SuppressWarnings("unchecked")
+                    Class<Authentication> clazz = (Class<Authentication>) Class.forName(className);
+                    Authentication auth = clazz.newInstance();
+                    clientBuilder.authentication(auth);
+                }
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+
+        clientBuilder.enableTls(Boolean.parseBoolean(properties.getProperty(USE_TLS, "false")));
+        clientBuilder.allowTlsInsecureConnection(
+                Boolean.parseBoolean(properties.getProperty(TLS_ALLOW_INSECURE_CONNECTION, "false")));
+        clientBuilder.enableTlsHostnameVerification(
+                Boolean.parseBoolean(properties.getProperty(TLS_HOSTNAME_VERIFICATION, "false")));
+
+        if (properties.containsKey(TLS_TRUST_CERTS_FILE_PATH)) {
+            clientBuilder.tlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH));
+        }
+
+        if (properties.containsKey(OPERATION_TIMEOUT_MS)) {
+            clientBuilder.operationTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)),
+                    TimeUnit.MILLISECONDS);
+        }
+
+        if (properties.containsKey(STATS_INTERVAL_SECONDS)) {
+            clientBuilder.statsInterval(Integer.parseInt(properties.getProperty(STATS_INTERVAL_SECONDS)),
+                    TimeUnit.SECONDS);
+        }
+
+        if (properties.containsKey(NUM_IO_THREADS)) {
+            clientBuilder.ioThreads(Integer.parseInt(properties.getProperty(NUM_IO_THREADS)));
+        }
+
+        if (properties.containsKey(CONNECTIONS_PER_BROKER)) {
+            clientBuilder.connectionsPerBroker(Integer.parseInt(properties.getProperty(CONNECTIONS_PER_BROKER)));
+        }
+
+        if (properties.containsKey(USE_TCP_NODELAY)) {
+            clientBuilder.enableTcpNoDelay(Boolean.parseBoolean(properties.getProperty(USE_TCP_NODELAY)));
+        }
+
+        if (properties.containsKey(CONCURRENT_LOOKUP_REQUESTS)) {
+            clientBuilder.maxConcurrentLookupRequests(Integer.parseInt(properties.getProperty(CONCURRENT_LOOKUP_REQUESTS)));
+        }
+
+        if (properties.containsKey(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)) {
+            clientBuilder.maxNumberOfRejectedRequestPerConnection(
+                    Integer.parseInt(properties.getProperty(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)));
+        }
+
+        return clientBuilder;
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
new file mode 100644
index 0000000..a527827
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+
+public class PulsarConsumerKafkaConfig {
+
+    /// Config variables
+    public static final String CONSUMER_NAME = "pulsar.consumer.name";
+    public static final String RECEIVER_QUEUE_SIZE = "pulsar.consumer.receiver.queue.size";
+    public static final String ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS = "pulsar.consumer.acknowledgments.group.time.millis";
+    public static final String TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS = "pulsar.consumer.total.receiver.queue.size.across.partitions";
+    public static final String SUBSCRIPTION_TOPICS_MODE = "pulsar.consumer.subscription.topics.mode";
+
+    public static ConsumerBuilder<byte[]> getConsumerBuilder(PulsarClient client, Properties properties) {
+        ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer();
+
+        if (properties.containsKey(CONSUMER_NAME)) {
+            consumerBuilder.consumerName(properties.getProperty(CONSUMER_NAME));
+        }
+
+        if (properties.containsKey(RECEIVER_QUEUE_SIZE)) {
+            consumerBuilder.receiverQueueSize(Integer.parseInt(properties.getProperty(RECEIVER_QUEUE_SIZE)));
+        }
+
+        if (properties.containsKey(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)) {
+            consumerBuilder.maxTotalReceiverQueueSizeAcrossPartitions(
+                    Integer.parseInt(properties.getProperty(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)));
+        }
+
+        if (properties.containsKey(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)) {
+            consumerBuilder.acknowledgmentGroupTime(
+                    Long.parseLong(properties.getProperty(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)), TimeUnit.MILLISECONDS);
+        }
+
+        if (properties.containsKey(SUBSCRIPTION_TOPICS_MODE)) {
+            RegexSubscriptionMode mode;
+            try {
+                mode = RegexSubscriptionMode.valueOf(properties.getProperty(SUBSCRIPTION_TOPICS_MODE));
+            } catch (IllegalArgumentException e) {
+                throw new IllegalArgumentException("Illegal subscription mode, valid values are: "
+                    + Arrays.asList(RegexSubscriptionMode.values()));
+            }
+            consumerBuilder.subscriptionTopicsMode(mode);
+        }
+
+        return consumerBuilder;
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
new file mode 100644
index 0000000..807f482
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class PulsarKafkaSchema<T> implements Schema<T> {
+
+    private final Serializer<T> kafkaSerializer;
+
+    private final Deserializer<T> kafkaDeserializer;
+
+    private String topic;
+
+    public PulsarKafkaSchema(Serializer<T> serializer) {
+        this(serializer, null);
+    }
+
+    public PulsarKafkaSchema(Deserializer<T> deserializer) {
+        this(null, deserializer);
+    }
+
+    public PulsarKafkaSchema(Serializer<T> serializer, Deserializer<T> deserializer) {
+        this.kafkaSerializer = serializer;
+        this.kafkaDeserializer = deserializer;
+    }
+
+    public Serializer<T> getKafkaSerializer() {
+        return kafkaSerializer;
+    }
+
+    public Deserializer<T> getKafkaDeserializer() {
+        return kafkaDeserializer;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    @Override
+    public byte[] encode(T message) {
+        checkArgument(kafkaSerializer != null, "Kafka serializer is not initialized yet");
+        return kafkaSerializer.serialize(this.topic, message);
+    }
+
+    @Override
+    public T decode(byte[] message) {
+        checkArgument(kafkaDeserializer != null, "Kafka deserializer is not initialized yet");
+        return kafkaDeserializer.deserialize(this.topic, message);
+    }
+
+    @Override
+    public SchemaInfo getSchemaInfo() {
+        return Schema.BYTES.getSchemaInfo();
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
new file mode 100644
index 0000000..5a9a651
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import java.util.Properties;
+
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+
+public class PulsarProducerKafkaConfig {
+
+    /// Config variables
+    public static final String PRODUCER_NAME = "pulsar.producer.name";
+    public static final String INITIAL_SEQUENCE_ID = "pulsar.producer.initial.sequence.id";
+
+    public static final String MAX_PENDING_MESSAGES = "pulsar.producer.max.pending.messages";
+    public static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = "pulsar.producer.max.pending.messages.across.partitions";
+    public static final String BATCHING_ENABLED = "pulsar.producer.batching.enabled";
+    public static final String BATCHING_MAX_MESSAGES = "pulsar.producer.batching.max.messages";
+
+    public static ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client, Properties properties) {
+        ProducerBuilder<byte[]> producerBuilder = client.newProducer();
+
+        if (properties.containsKey(PRODUCER_NAME)) {
+            producerBuilder.producerName(properties.getProperty(PRODUCER_NAME));
+        }
+
+        if (properties.containsKey(INITIAL_SEQUENCE_ID)) {
+            producerBuilder.initialSequenceId(Long.parseLong(properties.getProperty(INITIAL_SEQUENCE_ID)));
+        }
+
+        if (properties.containsKey(MAX_PENDING_MESSAGES)) {
+            producerBuilder.maxPendingMessages(Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES)));
+        }
+
+        if (properties.containsKey(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)) {
+            producerBuilder.maxPendingMessagesAcrossPartitions(
+                    Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)));
+        }
+
+        producerBuilder.enableBatching(Boolean.parseBoolean(properties.getProperty(BATCHING_ENABLED, "true")));
+
+        if (properties.containsKey(BATCHING_MAX_MESSAGES)) {
+            producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
+        }
+
+        return producerBuilder;
+    }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
new file mode 100644
index 0000000..4d12418
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.anyVararg;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import org.apache.avro.reflect.Nullable;
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+@PrepareForTest({PulsarClientKafkaConfig.class, PulsarProducerKafkaConfig.class})
+@PowerMockIgnore({"org.apache.logging.log4j.*", "org.apache.kafka.clients.producer.ProducerInterceptor"})
+public class PulsarKafkaProducerTest {
+
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class Foo {
+        @Nullable
+        private String field1;
+        @Nullable
+        private String field2;
+        private int field3;
+    }
+
+    @Data
+    @ToString
+    @EqualsAndHashCode
+    public static class Bar {
+        private boolean field1;
+    }
+
+    @ObjectFactory
+    // Necessary to make PowerMockito.mockStatic work with TestNG.
+    public IObjectFactory getObjectFactory() {
+        return new org.powermock.modules.testng.PowerMockObjectFactory();
+    }
+
+    @Test
+    public void testPulsarKafkaProducer() {
+        ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
+        ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
+        doAnswer(invocation -> {
+            Assert.assertEquals((int)invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000.");
+            return mockProducerBuilder;
+        }).when(mockProducerBuilder).sendTimeout(anyInt(), any(TimeUnit.class));
+        doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
+        doAnswer(invocation -> {
+            Assert.assertEquals((int)invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000.");
+            return mockClientBuilder;
+        }).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
+
+        PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
+        PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
+        when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
+        when(PulsarProducerKafkaConfig.getProducerBuilder(any(), any())).thenReturn(mockProducerBuilder);
+
+        Properties properties = new Properties();
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
+        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
+        properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
+        properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
+
+        new PulsarKafkaProducer<>(properties);
+
+        verify(mockClientBuilder, times(1)).keepAliveInterval(1000, TimeUnit.SECONDS);
+        verify(mockProducerBuilder, times(1)).sendTimeout(1000000, TimeUnit.MILLISECONDS);
+    }
+
+    @Test
+    public void testPulsarKafkaSendAvro() throws PulsarClientException {
+        // Arrange
+        PulsarClient mockClient = mock(PulsarClient.class);
+        ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
+        org.apache.pulsar.client.api.Producer mockProducer = mock(org.apache.pulsar.client.api.Producer.class);
+        ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
+        CompletableFuture mockPartitionFuture = new CompletableFuture();
+        CompletableFuture mockSendAsyncFuture = new CompletableFuture();
+        TypedMessageBuilder mockTypedMessageBuilder = mock(TypedMessageBuilderImpl.class);
+
+        mockPartitionFuture.complete(new ArrayList<>());
+        mockSendAsyncFuture.complete(new MessageIdImpl(1, 1, 1));
+        doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
+        doReturn(mockClientBuilder).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
+        doReturn(mockClient).when(mockClientBuilder).build();
+        doReturn(mockPartitionFuture).when(mockClient).getPartitionsForTopic(anyString());
+        doReturn(mockProducerBuilder).when(mockProducerBuilder).topic(anyString());
+        doReturn(mockProducerBuilder).when(mockProducerBuilder).clone();
+        doReturn(mockProducer).when(mockProducerBuilder).create();
+        doReturn(mockTypedMessageBuilder).when(mockProducer).newMessage();
+        doReturn(mockSendAsyncFuture).when(mockTypedMessageBuilder).sendAsync();
+        PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
+        PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
+        when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
+        when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder);
+
+        Properties properties = new Properties();
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
+        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
+        properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
+        properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
+
+        AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+        AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+        // Act
+        PulsarKafkaProducer<Foo, Bar> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, fooSchema, barSchema);
+
+        Bar bar = new Bar();
+        bar.setField1(true);
+
+        Foo foo = new Foo();
+        foo.setField1("field1");
+        foo.setField2("field2");
+        foo.setField3(3);
+
+        pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1, foo, bar));
+
+        // Verify
+        verify(mockTypedMessageBuilder).sendAsync();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid value 2147483648000 for 'connections.max.idle.ms'. Please use a value smaller than 2147483647000 milliseconds.")
+    public void testPulsarKafkaProducerKeepAliveIntervalIllegalArgumentException() {
+        Properties properties = new Properties();
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
+        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
+        properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, Long.toString((Integer.MAX_VALUE + 1L) * 1000));
+
+        new PulsarKafkaProducer<>(properties);
+    }
+
+}