Add support of pulsar-kafka-adapter for kafka-0.9 api (#4886)
Fixes #4791
**Motivation**
Currently the Pulsar Kafka wrapper is using Kafka 0.10.x version. However, there are users who use legacy-kafka version in their system and willing to move to pulsar. This PR provides pulsar-kafka adapter for kafka-api-version 0.9.X. So, this adapter can help users in their migration process from kafka-0.9 to pulsar.
diff --git a/pulsar-client-kafka-compat/pom.xml b/pulsar-client-kafka-compat/pom.xml
index f25274e..569647f 100644
--- a/pulsar-client-kafka-compat/pom.xml
+++ b/pulsar-client-kafka-compat/pom.xml
@@ -38,7 +38,10 @@
<modules>
<module>pulsar-client-kafka</module>
+ <module>pulsar-client-kafka_0_9</module>
<module>pulsar-client-kafka-shaded</module>
+ <module>pulsar-client-kafka-shaded_0_9</module>
<module>pulsar-client-kafka-tests</module>
+ <module>pulsar-client-kafka-tests_0_9</module>
</modules>
</project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-shaded_0_9/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded_0_9/pom.xml
new file mode 100644
index 0000000..a6a7ba3
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-shaded_0_9/pom.xml
@@ -0,0 +1,271 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-kafka-compat</artifactId>
+ <version>2.5.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>pulsar-client-kafka_0_9</artifactId>
+ <name>Pulsar Kafka compatibility 0.9 :: API</name>
+
+ <description>Drop-in replacement for Kafka client library that publishes and consumes
+ messages on Pulsar topics</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-kafka_0_9-original</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ <promoteTransitiveDependencies>true</promoteTransitiveDependencies>
+ <artifactSet>
+ <includes>
+ <include>org.apache.kafka:kafka-clients</include>
+ <include>org.apache.pulsar:pulsar-client-kafka_0_9-original</include>
+ <include>org.apache.pulsar:pulsar-client-original</include>
+ <include>org.apache.commons:commons-lang3</include>
+ <include>commons-codec:commons-codec</include>
+ <include>org.apache.bookkeeper:bookkeeper-common-allocator</include>
+ <include>commons-collections:commons-collections</include>
+ <include>org.asynchttpclient:*</include>
+ <include>io.netty:netty-codec-http</include>
+ <include>io.netty:netty-transport-native-epoll</include>
+ <include>org.reactivestreams:reactive-streams</include>
+ <include>com.typesafe.netty:netty-reactive-streams</include>
+ <include>org.javassist:javassist</include>
+ <include>com.google.protobuf:protobuf-java</include>
+ <include>com.google.guava:guava</include>
+ <include>com.google.code.gson:gson</include>
+ <include>com.fasterxml.jackson.core</include>
+ <include>com.fasterxml.jackson.module</include>
+ <include>com.fasterxml.jackson.dataformat</include>
+ <include>io.netty:netty</include>
+ <include>io.netty:netty-*</include>
+ <include>org.apache.pulsar:pulsar-common</include>
+ <include>org.apache.bookkeeper:circe-checksum</include>
+ <include>com.yahoo.datasketches:sketches-core</include>
+ <include>org.apache.httpcomponents:httpclient</include>
+ <include>commons-logging:commons-logging</include>
+ <include>org.apache.httpcomponents:httpcore</include>
+ <include>org.eclipse.jetty:*</include>
+ <include>com.yahoo.datasketches:*</include>
+ <include>commons-*:*</include>
+ <include>org.yaml:snakeyaml</include>
+ <include>org.objenesis:*</include>
+
+ <include>org.apache.avro:*</include>
+ <!-- Avro transitive dependencies-->
+ <include>org.codehaus.jackson:jackson-core-asl</include>
+ <include>org.codehaus.jackson:jackson-mapper-asl</include>
+ <include>com.thoughtworks.paranamer:paranamer</include>
+ <include>org.xerial.snappy:snappy-java</include>
+ <include>org.apache.commons:commons-compress</include>
+ <include>org.tukaani:xz</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>commons-logging:commons-logging</artifact>
+ <includes>
+ <include>**</include>
+ </includes>
+ </filter>
+ </filters>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.kafka.clients.producer.KafkaProducer</pattern>
+ <shadedPattern>org.apache.kafka.clients.producer.OriginalKafkaProducer</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.producer.PulsarKafkaProducer</pattern>
+ <shadedPattern>org.apache.kafka.clients.producer.KafkaProducer</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.consumer.KafkaConsumer</pattern>
+ <shadedPattern>org.apache.kafka.clients.consumer.OriginalKafkaConsumer</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.kafka.clients.consumer.PulsarKafkaConsumer</pattern>
+ <shadedPattern>org.apache.kafka.clients.consumer.KafkaConsumer</shadedPattern>
+ </relocation>
+
+ <!-- General relocation rules for Pulsar client dependencies -->
+
+ <relocation>
+ <pattern>org.asynchttpclient</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.asynchttpclient</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.commons</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.apache.commons</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.google</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.fasterxml.jackson</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.fasterxml.jackson</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>io.netty</pattern>
+ <shadedPattern>org.apache.pulsar.shade.io.netty</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.yahoo.datasketches</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.yahoo.datasketches</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.yahoo.sketches</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.yahoo.sketches</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.http</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.apache.http</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.eclipse.jetty</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.eclipse</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.reactivestreams</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.reactivestreams</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.typesafe</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.typesafe</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.yahoo.memory</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.yahoo.memory</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.objenesis</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.objenesis</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.yaml</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.yaml</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.avro</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.apache.avro</shadedPattern>
+ <excludes>
+ <exclude>org.apache.avro.reflect.AvroAlias</exclude>
+ <exclude>org.apache.avro.reflect.AvroDefault</exclude>
+ <exclude>org.apache.avro.reflect.AvroEncode</exclude>
+ <exclude>org.apache.avro.reflect.AvroIgnore</exclude>
+ <exclude>org.apache.avro.reflect.AvroMeta</exclude>
+ <exclude>org.apache.avro.reflect.AvroName</exclude>
+ <exclude>org.apache.avro.reflect.AvroSchema</exclude>
+ <exclude>org.apache.avro.reflect.Nullable</exclude>
+ <exclude>org.apache.avro.reflect.Stringable</exclude>
+ <exclude>org.apache.avro.reflect.Union</exclude>
+ </excludes>
+ </relocation>
+ <!--Avro transitive dependencies-->
+ <relocation>
+ <pattern>org.codehaus.jackson</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.codehaus.jackson</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.thoughtworks.paranamer</pattern>
+ <shadedPattern>org.apache.pulsar.shade.com.thoughtworks.paranamer</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.xerial.snappy</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.xerial.snappy</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.commons</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.apache.commons</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.tukaani</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.tukaani</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.bookkeeper</pattern>
+ <shadedPattern>org.apache.pulsar.shade.org.apache.bookkeeper</shadedPattern>
+ </relocation>
+ </relocations>
+ <filters>
+ <filter>
+ <artifact>org.apache.pulsar:pulsar-client-original</artifact>
+ <includes>
+ <include>**</include>
+ </includes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <!-- This plugin is used to run a script after the package phase in order to rename
+ libnetty_transport_native_epoll_x86_64.so from Netty into
+ liborg_apache_pulsar_shade_netty_transport_native_epoll_x86_64.so
+ to reflect the shade that is being applied.
+ -->
+ <artifactId>exec-maven-plugin</artifactId>
+ <groupId>org.codehaus.mojo</groupId>
+ <executions>
+ <execution>
+ <id>rename-epoll-library</id>
+ <phase>package</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <executable>${project.parent.basedir}/../src/${rename.netty.native.libs}</executable>
+ <arguments>
+ <argument>${project.artifactId}</argument>
+ </arguments>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
index aa5e29a..5d9fcfc 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
@@ -61,6 +61,7 @@
log.info("Message {} sent successfully", i);
}
+ producer.flush();
producer.close();
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
index a95413c..34e008e 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
@@ -45,6 +45,7 @@
log.info("Message {} sent successfully", i);
}
+ producer.flush();
producer.close();
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/pom.xml
new file mode 100644
index 0000000..c63c8dc
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/pom.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-kafka-compat</artifactId>
+ <version>2.5.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>pulsar-client-kafka_0_9-tests</artifactId>
+ <name>Pulsar Kafka compatibility 0.9 :: Tests</name>
+
+ <description>Tests to verify the correct shading configuration for the pulsar-client-kafka wrapper</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-kafka_0_9</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-broker</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-broker</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>managed-ledger-original</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerAvroExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerAvroExample.java
new file mode 100644
index 0000000..3e39b8d
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerAvroExample.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Bar;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Foo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+public class ConsumerAvroExample {
+
+ public static void main(String[] args) {
+ String topic = "persistent://public/default/test-avro";
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "pulsar://localhost:6650");
+ props.put("group.id", "my-subscription-name");
+ props.put("enable.auto.commit", "false");
+ props.put("key.deserializer", IntegerDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+
+ Bar bar = new Bar();
+ bar.setField1(true);
+
+ Foo foo = new Foo();
+ foo.setField1("field1");
+ foo.setField2("field2");
+ foo.setField3(3);
+
+ @SuppressWarnings("resource")
+ Consumer<Foo, Bar> consumer = new KafkaConsumer<>(props, fooSchema, barSchema);
+ consumer.subscribe(Arrays.asList(topic));
+
+ while (true) {
+ ConsumerRecords<Foo, Bar> records = consumer.poll(100);
+ records.forEach(record -> {
+ log.info("Received record: {}", record);
+ });
+
+ // Commit last offset
+ consumer.commitSync();
+ }
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(ConsumerExample.class);
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerExample.java
new file mode 100644
index 0000000..983d7b7
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ConsumerExample.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ConsumerExample {
+
+ public static void main(String[] args) {
+ String topic = "persistent://public/default/test";
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "pulsar://localhost:6650");
+ props.put("group.id", "my-subscription-name");
+ props.put("enable.auto.commit", "false");
+ props.put("key.deserializer", IntegerDeserializer.class.getName());
+ props.put("value.deserializer", StringDeserializer.class.getName());
+
+ @SuppressWarnings("resource")
+ Consumer<Integer, String> consumer = new KafkaConsumer<>(props);
+ consumer.subscribe(Arrays.asList(topic));
+
+ while (true) {
+ ConsumerRecords<Integer, String> records = consumer.poll(100);
+ records.forEach(record -> {
+ log.info("Received record: {}", record);
+ });
+
+ // Commit last offset
+ consumer.commitSync();
+ }
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(ConsumerExample.class);
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
new file mode 100644
index 0000000..5d9fcfc
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerAvroExample.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Bar;
+import org.apache.pulsar.client.kafka.compat.examples.utils.Foo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+public class ProducerAvroExample {
+ public static void main(String[] args) {
+ String topic = "persistent://public/default/test-avro";
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "pulsar://localhost:6650");
+
+ props.put("key.serializer", IntegerSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+
+ Bar bar = new Bar();
+ bar.setField1(true);
+
+ Foo foo = new Foo();
+ foo.setField1("field1");
+ foo.setField2("field2");
+ foo.setField3(3);
+
+
+ Producer<Foo, Bar> producer = new KafkaProducer<>(props, fooSchema, barSchema);
+
+ for (int i = 0; i < 10; i++) {
+ producer.send(new ProducerRecord<Foo, Bar>(topic, i, foo, bar));
+ log.info("Message {} sent successfully", i);
+ }
+
+ producer.flush();
+ producer.close();
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(ProducerExample.class);
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
new file mode 100644
index 0000000..f089b26
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/ProducerExample.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProducerExample {
+ public static void main(String[] args) {
+ String topic = "persistent://public/default/test";
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "pulsar://localhost:6650");
+ props.put("key.serializer", IntegerSerializer.class.getName());
+ props.put("value.serializer", StringSerializer.class.getName());
+
+ Producer<Integer, String> producer = new KafkaProducer<>(props);
+
+ for (int i = 0; i < 10; i++) {
+ producer.send(new ProducerRecord<Integer, String>(topic, i, Integer.toString(i)));
+ log.info("Message {} sent successfully", i);
+ }
+
+ producer.flush();
+ producer.close();
+ }
+
+ private static final Logger log = LoggerFactory.getLogger(ProducerExample.class);
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Bar.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Bar.java
new file mode 100644
index 0000000..8120900
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Bar.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples.utils;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+@Data
+@ToString
+@EqualsAndHashCode
+public class Bar {
+ private boolean field1;
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Foo.java b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Foo.java
new file mode 100644
index 0000000..d584f51
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka-tests_0_9/src/test/java/org/apache/pulsar/client/kafka/compat/examples/utils/Foo.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat.examples.utils;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.avro.reflect.Nullable;
+
+@Data
+@ToString
+@EqualsAndHashCode
+public class Foo {
+ @Nullable
+ private String field1;
+ @Nullable
+ private String field2;
+ private int field3;
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index 15e23a2..06b1c6e 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -60,7 +60,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
+import org.apache.pulsar.client.util.MessageIdUtils;
import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 14dd78b..cacca60 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -57,7 +57,7 @@
import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
import org.apache.pulsar.client.kafka.compat.KafkaProducerInterceptorWrapper;
-import org.apache.pulsar.client.kafka.compat.MessageIdUtils;
+import org.apache.pulsar.client.util.MessageIdUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/MessageIdUtils.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/MessageIdUtils.java
deleted file mode 100644
index d8f7680..0000000
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/MessageIdUtils.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.kafka.compat;
-
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.impl.MessageIdImpl;
-
-public class MessageIdUtils {
- public static final long getOffset(MessageId messageId) {
- MessageIdImpl msgId = (MessageIdImpl) messageId;
- long ledgerId = msgId.getLedgerId();
- long entryId = msgId.getEntryId();
-
- // Combine ledger id and entry id to form offset
- // Use less than 32 bits to represent entry id since it will get
- // rolled over way before overflowing the max int range
- long offset = (ledgerId << 28) | entryId;
- return offset;
- }
-
- public static final MessageId getMessageId(long offset) {
- // Demultiplex ledgerId and entryId from offset
- long ledgerId = offset >>> 28;
- long entryId = offset & 0x0F_FF_FF_FFL;
-
- return new MessageIdImpl(ledgerId, entryId, -1);
- }
-}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/pom.xml b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/pom.xml
new file mode 100644
index 0000000..0740e78
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/pom.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+ xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-kafka-compat</artifactId>
+ <version>2.5.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <properties>
+ <kafka_0_9.version>0.9.0.1</kafka_0_9.version>
+ </properties>
+
+ <artifactId>pulsar-client-kafka_0_9-original</artifactId>
+ <name>Pulsar Kafka compatibility 0.9 :: API (original)</name>
+
+ <description>Kafka client library that publishes and consumes messages on Pulsar topics</description>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <version>${kafka_0_9.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>net.jpountz.lz4</groupId>
+ <artifactId>lz4</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.lz4</groupId>
+ <artifactId>lz4-java</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
new file mode 100644
index 0000000..6d3c383
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -0,0 +1,597 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarConsumerKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
+
+@Slf4j
+public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener<byte[]> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final PulsarClient client;
+
+ private final Schema<K> keySchema;
+ private final Schema<V> valueSchema;
+
+ private final String groupId;
+ private final boolean isAutoCommit;
+
+ private final ConcurrentMap<TopicPartition, org.apache.pulsar.client.api.Consumer<byte[]>> consumers = new ConcurrentHashMap<>();
+
+ private final Map<TopicPartition, Long> lastReceivedOffset = new ConcurrentHashMap<>();
+ private final Map<TopicPartition, OffsetAndMetadata> lastCommittedOffset = new ConcurrentHashMap<>();
+ private final Set<TopicPartition> unpolledPartitions = new HashSet<>();
+ private final SubscriptionInitialPosition strategy;
+
+ private volatile boolean closed = false;
+
+ private final int maxRecordsInSinglePoll;
+
+ private final Properties properties;
+
+
+ private static class QueueItem {
+ final org.apache.pulsar.client.api.Consumer<byte[]> consumer;
+ final Message<byte[]> message;
+
+ QueueItem(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> message) {
+ this.consumer = consumer;
+ this.message = message;
+ }
+ }
+
+ // Since a single Kafka consumer can receive from multiple topics, we need to multiplex all the different
+ // topics/partitions into a single queues
+ private final BlockingQueue<QueueItem> receivedMessages = new ArrayBlockingQueue<>(1000);
+
+ public PulsarKafkaConsumer(Map<String, Object> configs) {
+ this(new ConsumerConfig(configs), null, null);
+ }
+
+ public PulsarKafkaConsumer(Map<String, Object> configs, Deserializer<K> keyDeserializer,
+ Deserializer<V> valueDeserializer) {
+ this(new ConsumerConfig(configs),
+ new PulsarKafkaSchema<>(keyDeserializer), new PulsarKafkaSchema<>(valueDeserializer));
+ }
+
+ public PulsarKafkaConsumer(Map<String, Object> configs, Schema<K> keySchema, Schema<V> valueSchema) {
+ this(new ConsumerConfig(configs), keySchema, valueSchema);
+ }
+
+ public PulsarKafkaConsumer(Properties properties) {
+ this(new ConsumerConfig(properties), null, null);
+ }
+
+ public PulsarKafkaConsumer(Properties properties, Deserializer<K> keyDeserializer,
+ Deserializer<V> valueDeserializer) {
+ this(new ConsumerConfig(properties),
+ new PulsarKafkaSchema<>(keyDeserializer), new PulsarKafkaSchema<>(valueDeserializer));
+ }
+
+ public PulsarKafkaConsumer(Properties properties, Schema<K> keySchema, Schema<V> valueSchema) {
+ this(new ConsumerConfig(properties), keySchema, valueSchema);
+ }
+
+ @SuppressWarnings("unchecked")
+ private PulsarKafkaConsumer(ConsumerConfig consumerConfig, Schema<K> keySchema, Schema<V> valueSchema) {
+
+ if (keySchema == null) {
+ Deserializer<K> kafkaKeyDeserializer = consumerConfig.getConfiguredInstance(
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+ kafkaKeyDeserializer.configure(consumerConfig.originals(), true);
+ this.keySchema = new PulsarKafkaSchema<>(kafkaKeyDeserializer);
+ } else {
+ this.keySchema = keySchema;
+ consumerConfig.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+ }
+
+ if (valueSchema == null) {
+ Deserializer<V> kafkaValueDeserializer = consumerConfig.getConfiguredInstance(
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+ kafkaValueDeserializer.configure(consumerConfig.originals(), true);
+ this.valueSchema = new PulsarKafkaSchema<>(kafkaValueDeserializer);
+ } else {
+ this.valueSchema = valueSchema;
+ consumerConfig.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+ }
+
+ groupId = consumerConfig.getString(ConsumerConfig.GROUP_ID_CONFIG);
+ isAutoCommit = consumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+ strategy = getStrategy(consumerConfig.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+ log.info("Offset reset strategy has been assigned value {}", strategy);
+
+ String serviceUrl = consumerConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
+
+ // there is not this config in kafka 0.9, so use default value.
+ maxRecordsInSinglePoll = 1000;
+
+ this.properties = new Properties();
+ consumerConfig.originals().forEach(properties::put);
+ ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
+ // Since this client instance is going to be used just for the consumers, we can enable Nagle to group
+ // all the acknowledgments sent to broker within a short time frame
+ clientBuilder.enableTcpNoDelay(false);
+ try {
+ client = clientBuilder.serviceUrl(serviceUrl).build();
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private SubscriptionInitialPosition getStrategy(final String strategy) {
+ if ("earliest".equals(strategy)) {
+ return SubscriptionInitialPosition.Earliest;
+ } else {
+ return SubscriptionInitialPosition.Latest;
+ }
+ }
+
+ @Override
+ public Set<TopicPartition> assignment() {
+ throw new UnsupportedOperationException("Cannot access the partitions assignements");
+ }
+
+ @Override
+ public Set<String> subscription() {
+ return consumers.keySet().stream().map(TopicPartition::topic).collect(Collectors.toSet());
+ }
+
+ @Override
+ public void subscribe(List<String> topics) {
+ subscribe(topics, null);
+ }
+
+ @Override
+ public void subscribe(List<String> topics, ConsumerRebalanceListener callback) {
+ List<CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>>> futures = new ArrayList<>();
+
+ List<TopicPartition> topicPartitions = new ArrayList<>();
+ try {
+ for (String topic : topics) {
+ // Create individual subscription on each partition, that way we can keep using the
+ // acknowledgeCumulative()
+ int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get();
+
+ ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(client, properties);
+ consumerBuilder.subscriptionType(SubscriptionType.Failover);
+ consumerBuilder.messageListener(this);
+ consumerBuilder.subscriptionName(groupId);
+ consumerBuilder.topics(topics);
+ if (numberOfPartitions > 1) {
+ // Subscribe to each partition
+ consumerBuilder.consumerName(ConsumerName.generateRandomName());
+ for (int i = 0; i < numberOfPartitions; i++) {
+ String partitionName = TopicName.get(topic).getPartition(i).toString();
+ CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
+ .topic(partitionName).subscribeAsync();
+ int partitionIndex = i;
+ TopicPartition tp = new TopicPartition(
+ TopicName.get(topic).getPartitionedTopicName(),
+ partitionIndex);
+ futures.add(future.thenApply(consumer -> {
+ log.info("Add consumer {} for partition {}", consumer, tp);
+ consumers.putIfAbsent(tp, consumer);
+ return consumer;
+ }));
+ topicPartitions.add(tp);
+ }
+ } else {
+ // Topic has a single partition
+ CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
+ .subscribeAsync();
+ TopicPartition tp = new TopicPartition(
+ TopicName.get(topic).getPartitionedTopicName(),
+ 0);
+ futures.add(future.thenApply(consumer -> {
+ log.info("Add consumer {} for partition {}", consumer, tp);
+ consumers.putIfAbsent(tp, consumer);
+ return consumer;
+ }));
+ topicPartitions.add(tp);
+ }
+ }
+ unpolledPartitions.addAll(topicPartitions);
+
+ // Wait for all consumers to be ready
+ futures.forEach(CompletableFuture::join);
+
+ // Notify the listener is now owning all topics/partitions
+ if (callback != null) {
+ callback.onPartitionsAssigned(topicPartitions);
+ }
+
+ } catch (Exception e) {
+ // Close all consumer that might have been successfully created
+ futures.forEach(f -> {
+ try {
+ f.get().close();
+ } catch (Exception e1) {
+ // Ignore. Consumer already had failed
+ }
+ });
+
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void assign(List<TopicPartition> list) {
+ throw new UnsupportedOperationException("Cannot manually assign partitions");
+ }
+
+ @Override
+ public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
+ throw new UnsupportedOperationException("Cannot subscribe with topic name pattern");
+ }
+
+ @Override
+ public void unsubscribe() {
+ consumers.values().forEach(c -> {
+ try {
+ c.unsubscribe();
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ @Override
+ public ConsumerRecords<K, V> poll(long timeoutMillis) {
+ try {
+ QueueItem item = receivedMessages.poll(timeoutMillis, TimeUnit.MILLISECONDS);
+ if (item == null) {
+ return (ConsumerRecords<K, V>) ConsumerRecords.EMPTY;
+ }
+
+ Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>();
+
+ int numberOfRecords = 0;
+
+ while (item != null) {
+ TopicName topicName = TopicName.get(item.consumer.getTopic());
+ String topic = topicName.getPartitionedTopicName();
+ int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
+ Message<byte[]> msg = item.message;
+ MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
+ long offset = MessageIdUtils.getOffset(msgId);
+
+ TopicPartition tp = new TopicPartition(topic, partition);
+ if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp)) {
+ log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp);
+ resetOffsets(tp);
+ }
+
+ K key = getKey(topic, msg);
+ if (valueSchema instanceof PulsarKafkaSchema) {
+ ((PulsarKafkaSchema<V>) valueSchema).setTopic(topic);
+ }
+ V value = valueSchema.decode(msg.getData());
+
+ ConsumerRecord<K, V> consumerRecord = new ConsumerRecord<>(topic, partition, offset, key, value);
+
+ records.computeIfAbsent(tp, k -> new ArrayList<>()).add(consumerRecord);
+
+ // Update last offset seen by application
+ lastReceivedOffset.put(tp, offset);
+ unpolledPartitions.remove(tp);
+
+ if (++numberOfRecords >= maxRecordsInSinglePoll) {
+ break;
+ }
+
+ // Check if we have an item already available
+ item = receivedMessages.poll(0, TimeUnit.MILLISECONDS);
+ }
+
+ if (isAutoCommit && !records.isEmpty()) {
+ // Commit the offset of previously dequeued messages
+ commitAsync();
+ }
+
+ return new ConsumerRecords<>(records);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void commitSync() {
+ try {
+ doCommitOffsets(getCurrentOffsetsMap()).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
+ try {
+ doCommitOffsets(offsets).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void commitAsync() {
+ doCommitOffsets(getCurrentOffsetsMap());
+ }
+
+ @Override
+ public void commitAsync(OffsetCommitCallback callback) {
+ Map<TopicPartition, OffsetAndMetadata> offsets = getCurrentOffsetsMap();
+ doCommitOffsets(offsets).handle((v, throwable) -> {
+ callback.onComplete(offsets, throwable != null ? new Exception(throwable) : null);
+ return null;
+ });
+ }
+
+ @Override
+ public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
+ doCommitOffsets(offsets).handle((v, throwable) -> {
+ callback.onComplete(offsets, throwable != null ? new Exception(throwable) : null);
+ return null;
+ });
+ }
+
+ private CompletableFuture<Void> doCommitOffsets(Map<TopicPartition, OffsetAndMetadata> offsets) {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+ offsets.forEach((topicPartition, offsetAndMetadata) -> {
+ org.apache.pulsar.client.api.Consumer<byte[]> consumer = consumers.get(topicPartition);
+ lastCommittedOffset.put(topicPartition, offsetAndMetadata);
+ futures.add(consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId(offsetAndMetadata.offset())));
+ });
+
+ return FutureUtil.waitForAll(futures);
+ }
+
+ private Map<TopicPartition, OffsetAndMetadata> getCurrentOffsetsMap() {
+ Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+ lastReceivedOffset.forEach((topicPartition, offset) -> {
+ OffsetAndMetadata om = new OffsetAndMetadata(offset);
+ offsets.put(topicPartition, om);
+ });
+
+ return offsets;
+ }
+
+ @Override
+ public void seek(TopicPartition partition, long offset) {
+ MessageId msgId = MessageIdUtils.getMessageId(offset);
+ org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(partition);
+ if (c == null) {
+ throw new IllegalArgumentException("Cannot seek on a partition where we are not subscribed");
+ }
+
+ try {
+ c.seek(msgId);
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void seekToBeginning(TopicPartition... partitions) {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+ if (partitions.length == 0) {
+ partitions = consumers.keySet().toArray(new TopicPartition[0]);
+ }
+ lastCommittedOffset.clear();
+ lastReceivedOffset.clear();
+
+ for (TopicPartition tp : partitions) {
+ org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
+ if (c == null) {
+ futures.add(FutureUtil.failedFuture(
+ new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
+ } else {
+ futures.add(c.seekAsync(MessageId.earliest));
+ }
+ }
+
+ FutureUtil.waitForAll(futures).join();
+ }
+
+ @Override
+ public void seekToEnd(TopicPartition... partitions) {
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+ if (partitions.length == 0) {
+ partitions = consumers.keySet().toArray(new TopicPartition[0]);
+ }
+ lastCommittedOffset.clear();
+ lastReceivedOffset.clear();
+
+ for (TopicPartition tp : partitions) {
+ org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
+ if (c == null) {
+ futures.add(FutureUtil.failedFuture(
+ new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
+ } else {
+ futures.add(c.seekAsync(MessageId.latest));
+ }
+ }
+
+ FutureUtil.waitForAll(futures).join();
+ }
+
+ @Override
+ public long position(TopicPartition partition) {
+ Long offset = lastReceivedOffset.get(partition);
+ if (offset == null && !unpolledPartitions.contains(partition)) {
+ return resetOffsets(partition).getValue();
+ }
+ return unpolledPartitions.contains(partition) ? 0 : offset;
+ }
+
+ @Override
+ public OffsetAndMetadata committed(TopicPartition partition) {
+ return lastCommittedOffset.get(partition);
+ }
+
+ @Override
+ public Map<MetricName, ? extends Metric> metrics() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<PartitionInfo> partitionsFor(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<String, List<PartitionInfo>> listTopics() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void pause(TopicPartition... topicPartitions) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void resume(TopicPartition... topicPartitions) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() {
+ try {
+ closed = true;
+
+ if (isAutoCommit) {
+ commitAsync();
+ }
+
+ client.closeAsync().get(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void wakeup() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * This method is called whenever a new message is received.
+ * <p>
+ * Messages are guaranteed to be delivered in order and from the same thread for a single consumer
+ * <p>
+ * This method will only be called once for each message, unless either application or broker crashes.
+ * <p>
+ * Application is responsible of handling any exception that could be thrown while processing the message.
+ *
+ * @param consumer the consumer that received the message
+ * @param msg
+ */
+ @Override
+ public void received(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> msg) {
+ // Block listener thread if the application is slowing down
+ try {
+ receivedMessages.put(new QueueItem(consumer, msg));
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ if (closed) {
+ // Consumer was closed and the thread was interrupted. Nothing to worry about here
+ } else {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private SubscriptionInitialPosition resetOffsets(final TopicPartition partition) {
+ log.info("Resetting partition {} and seeking to {} position", partition, strategy);
+ if (strategy == SubscriptionInitialPosition.Earliest) {
+ seekToBeginning(partition);
+ } else {
+ seekToEnd(partition);
+ }
+ return strategy;
+ }
+
+ @SuppressWarnings("unchecked")
+ private K getKey(String topic, Message<byte[]> msg) {
+ if (!msg.hasKey()) {
+ return null;
+ }
+
+ if (keySchema instanceof PulsarKafkaSchema) {
+ PulsarKafkaSchema<K> pulsarKafkaSchema = (PulsarKafkaSchema) keySchema;
+ Deserializer<K> kafkaDeserializer = pulsarKafkaSchema.getKafkaDeserializer();
+ if (kafkaDeserializer instanceof StringDeserializer) {
+ return (K) msg.getKey();
+ }
+ pulsarKafkaSchema.setTopic(topic);
+ }
+ // Assume base64 encoding
+ byte[] data = Base64.getDecoder().decode(msg.getKey());
+ return keySchema.decode(data);
+
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
new file mode 100644
index 0000000..b853459
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.kafka.compat.KafkaMessageRouter;
+import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarKafkaSchema;
+import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
+import org.apache.pulsar.client.util.MessageIdUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PulsarKafkaProducer<K, V> implements Producer<K, V> {
+
+ private final PulsarClient client;
+ private final ProducerBuilder<byte[]> pulsarProducerBuilder;
+
+ private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer<byte[]>> producers = new ConcurrentHashMap<>();
+
+ private final Schema<K> keySchema;
+ private final Schema<V> valueSchema;
+
+ private final Partitioner partitioner;
+ private volatile Cluster cluster = Cluster.empty();
+
+ private final Properties properties;
+
+ private static final Logger logger = LoggerFactory.getLogger(PulsarKafkaProducer.class);
+
+ public PulsarKafkaProducer(Map<String, Object> configs) {
+ this(new ProducerConfig(configs), null, null);
+ }
+
+ public PulsarKafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer,
+ Serializer<V> valueSerializer) {
+ this(new ProducerConfig(configs), new PulsarKafkaSchema<>(keySerializer), new PulsarKafkaSchema<>(valueSerializer));
+ }
+
+ public PulsarKafkaProducer(Map<String, Object> configs, Schema<K> keySchema, Schema<V> valueSchema) {
+ this(new ProducerConfig(configs), keySchema, valueSchema);
+ }
+
+ public PulsarKafkaProducer(Properties properties) {
+ this(new ProducerConfig(properties), null, null);
+ }
+
+ public PulsarKafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+ this(new ProducerConfig(properties), new PulsarKafkaSchema<>(keySerializer), new PulsarKafkaSchema<>(valueSerializer));
+ }
+
+ public PulsarKafkaProducer(Properties properties, Schema<K> keySchema, Schema<V> valueSchema) {
+ this(new ProducerConfig(properties), keySchema, valueSchema);
+ }
+
+ @SuppressWarnings({ "unchecked", "deprecation" })
+ private PulsarKafkaProducer(ProducerConfig producerConfig, Schema<K> keySchema, Schema<V> valueSchema) {
+
+ if (keySchema == null) {
+ Serializer<K> kafkaKeySerializer = producerConfig.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
+ kafkaKeySerializer.configure(producerConfig.originals(), true);
+ this.keySchema = new PulsarKafkaSchema<>(kafkaKeySerializer);
+ } else {
+ this.keySchema = keySchema;
+ producerConfig.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+ }
+
+ if (valueSchema == null) {
+ Serializer<V> kafkaValueSerializer = producerConfig.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
+ kafkaValueSerializer.configure(producerConfig.originals(), false);
+ this.valueSchema = new PulsarKafkaSchema<>(kafkaValueSerializer);
+ } else {
+ this.valueSchema = valueSchema;
+ producerConfig.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+ }
+
+ partitioner = producerConfig.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
+ partitioner.configure(producerConfig.originals());
+
+ this.properties = new Properties();
+ producerConfig.originals().forEach(properties::put);
+
+ long keepAliveIntervalMs = Long.parseLong(properties.getProperty(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "30000"));
+
+ String serviceUrl = producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
+ try {
+ // Support Kafka's ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG in ms.
+ // If passed in value is greater than Integer.MAX_VALUE in second will throw IllegalArgumentException.
+ int keepAliveInterval = Math.toIntExact(keepAliveIntervalMs / 1000);
+ client = PulsarClientKafkaConfig.getClientBuilder(properties)
+ .serviceUrl(serviceUrl)
+ .keepAliveInterval(keepAliveInterval, TimeUnit.SECONDS)
+ .build();
+ } catch (ArithmeticException e) {
+ String errorMessage = String.format("Invalid value %d for 'connections.max.idle.ms'. " +
+ "Please use a value smaller than %d000 milliseconds.", keepAliveIntervalMs, Integer.MAX_VALUE);
+ logger.error(errorMessage);
+ throw new IllegalArgumentException(errorMessage);
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+
+ pulsarProducerBuilder = PulsarProducerKafkaConfig.getProducerBuilder(client, properties);
+
+ // To mimic the same batching mode as Kafka, we need to wait a very little amount of
+ // time to batch if the client is trying to send messages fast enough
+ long lingerMs = Long.parseLong(properties.getProperty(ProducerConfig.LINGER_MS_CONFIG, "1"));
+ pulsarProducerBuilder.batchingMaxPublishDelay(lingerMs, TimeUnit.MILLISECONDS);
+
+ String compressionType = properties.getProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG);
+ if ("gzip".equals(compressionType)) {
+ pulsarProducerBuilder.compressionType(CompressionType.ZLIB);
+ } else if ("lz4".equals(compressionType)) {
+ pulsarProducerBuilder.compressionType(CompressionType.LZ4);
+ }
+
+ pulsarProducerBuilder.messageRouter(new KafkaMessageRouter(lingerMs));
+
+ int sendTimeoutMillis = Integer.parseInt(properties.getProperty(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "30000"));
+ pulsarProducerBuilder.sendTimeout(sendTimeoutMillis, TimeUnit.MILLISECONDS);
+
+ // Kafka blocking semantic when blockOnBufferFull=false is different from Pulsar client
+ // Pulsar throws error immediately when the queue is full and blockIfQueueFull=false
+ // Kafka, on the other hand, still blocks for "max.block.ms" time and then gives error.
+ boolean shouldBlockPulsarProducer = sendTimeoutMillis > 0;
+ pulsarProducerBuilder.blockIfQueueFull(shouldBlockPulsarProducer);
+ }
+
+
+ @Override
+ public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord) {
+ return send(producerRecord, null);
+ }
+
+ @Override
+ public Future<RecordMetadata> send(ProducerRecord<K, V> producerRecord, Callback callback) {
+ org.apache.pulsar.client.api.Producer<byte[]> producer;
+
+ try {
+ producer = producers.computeIfAbsent(producerRecord.topic(), topic -> createNewProducer(topic));
+ } catch (Exception e) {
+ if (callback != null) {
+ callback.onCompletion(null, e);
+ }
+ CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
+ future.completeExceptionally(e);
+ return future;
+ }
+
+ TypedMessageBuilder<byte[]> messageBuilder = buildMessage(producer, producerRecord);
+
+ CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
+ messageBuilder.sendAsync().thenAccept((messageId) -> {
+ future.complete(getRecordMetadata(producerRecord.topic(), messageId));
+ }).exceptionally(ex -> {
+ future.completeExceptionally(ex);
+ return null;
+ });
+
+ future.handle((recordMetadata, throwable) -> {
+ if (callback != null) {
+ Exception exception = throwable != null ? new Exception(throwable) : null;
+ callback.onCompletion(recordMetadata, exception);
+ }
+ return null;
+ });
+
+ return future;
+ }
+
+ @Override
+ public void flush() {
+ producers.values().stream()
+ .map(p -> p.flushAsync())
+ .collect(Collectors.toList())
+ .forEach(CompletableFuture::join);
+ }
+
+ @Override
+ public List<PartitionInfo> partitionsFor(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<MetricName, ? extends Metric> metrics() {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public void close() {
+ close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ partitioner.close();
+ }
+
+ @Override
+ public void close(long timeout, TimeUnit unit) {
+ try {
+ client.closeAsync().get(timeout, unit);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private org.apache.pulsar.client.api.Producer<byte[]> createNewProducer(String topic) {
+ try {
+ // Add the partitions info for the new topic
+ cluster = addPartitionsInfo(cluster, topic);
+ return pulsarProducerBuilder.clone()
+ .topic(topic)
+ .create();
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Add the partitions info for the new topic.
+ * Need to ensure the atomicity of the update operation.
+ */
+ private synchronized Cluster addPartitionsInfo(Cluster cluster, String topic) {
+ List<String> partitions = client.getPartitionsForTopic(topic).join();
+ // read partitions info
+ Set<PartitionInfo> partitionsInfo = new HashSet<>();
+ for (int i = 0; i < partitions.size(); i++) {
+ partitionsInfo.add(new PartitionInfo(topic, i, null, null, null));
+ }
+ // create cluster with new partitions info
+ Set<PartitionInfo> combinedPartitions = new HashSet<>();
+ if (cluster.partitionsForTopic(topic) != null) {
+ combinedPartitions.addAll(cluster.partitionsForTopic(topic));
+ }
+ combinedPartitions.addAll(partitionsInfo);
+ return new Cluster(cluster.nodes(), combinedPartitions, new HashSet(cluster.unauthorizedTopics()));
+ }
+
+ private TypedMessageBuilder<byte[]> buildMessage(org.apache.pulsar.client.api.Producer<byte[]> producer, ProducerRecord<K, V> record) {
+ TypedMessageBuilder<byte[]> builder = producer.newMessage();
+
+ byte[] keyBytes = null;
+ if (record.key() != null) {
+ String key = getKey(record.topic(), record.key());
+ keyBytes = key.getBytes(StandardCharsets.UTF_8);
+ builder.key(key);
+ }
+
+ if (valueSchema instanceof PulsarKafkaSchema) {
+ ((PulsarKafkaSchema<V>) valueSchema).setTopic(record.topic());
+ }
+ byte[] value = valueSchema.encode(record.value());
+ builder.value(value);
+
+ if (record.partition() != null) {
+ // Partition was explicitly set on the record
+ builder.property(KafkaMessageRouter.PARTITION_ID, record.partition().toString());
+ } else {
+ // Get the partition id from the partitioner
+ int partition = partitioner.partition(record.topic(), record.key(), keyBytes, record.value(), value, cluster);
+ builder.property(KafkaMessageRouter.PARTITION_ID, Integer.toString(partition));
+ }
+ return builder;
+ }
+
+ private String getKey(String topic, K key) {
+ // If key is a String, we can use it as it is, otherwise, serialize to byte[] and encode in base64
+ if (key instanceof String) {
+ return (String) key;
+ }
+ if (keySchema instanceof PulsarKafkaSchema) {
+ ((PulsarKafkaSchema) keySchema).setTopic(topic);
+ }
+ byte[] keyBytes = keySchema.encode(key);
+ return Base64.getEncoder().encodeToString(keyBytes);
+ }
+
+ private RecordMetadata getRecordMetadata(String topic, MessageId messageId) {
+ MessageIdImpl msgId = (MessageIdImpl) messageId;
+
+ // Combine ledger id and entry id to form offset
+ long offset = MessageIdUtils.getOffset(msgId);
+ int partition = msgId.getPartitionIndex();
+
+ TopicPartition tp = new TopicPartition(topic, partition);
+ return new RecordMetadata(tp, offset, 0L);
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaMessageRouter.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaMessageRouter.java
new file mode 100644
index 0000000..d3fb915
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/KafkaMessageRouter.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.impl.RoundRobinPartitionMessageRouterImpl;
+
+public class KafkaMessageRouter extends RoundRobinPartitionMessageRouterImpl {
+
+ public static final String PARTITION_ID = "pulsar.partition.id";
+
+ public KafkaMessageRouter(long maxBatchingDelayMs) {
+ super(HashingScheme.JavaStringHash, ThreadLocalRandom.current().nextInt(), true, maxBatchingDelayMs);
+ }
+
+ @Override
+ public int choosePartition(Message<?> msg, TopicMetadata metadata) {
+ if (msg.hasProperty(PARTITION_ID)) {
+ return Integer.parseInt(msg.getProperty(PARTITION_ID));
+ } else {
+ return super.choosePartition(msg, metadata);
+ }
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
new file mode 100644
index 0000000..c38d93d
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+
+public class PulsarClientKafkaConfig {
+
+ /// Config variables
+ public static final String AUTHENTICATION_CLASS = "pulsar.authentication.class";
+ public static final String AUTHENTICATION_PARAMS_MAP = "pulsar.authentication.params.map";
+ public static final String AUTHENTICATION_PARAMS_STRING = "pulsar.authentication.params.string";
+ public static final String USE_TLS = "pulsar.use.tls";
+ public static final String TLS_TRUST_CERTS_FILE_PATH = "pulsar.tls.trust.certs.file.path";
+ public static final String TLS_ALLOW_INSECURE_CONNECTION = "pulsar.tls.allow.insecure.connection";
+ public static final String TLS_HOSTNAME_VERIFICATION = "pulsar.tls.hostname.verification";
+
+ public static final String OPERATION_TIMEOUT_MS = "pulsar.operation.timeout.ms";
+ public static final String STATS_INTERVAL_SECONDS = "pulsar.stats.interval.seconds";
+ public static final String NUM_IO_THREADS = "pulsar.num.io.threads";
+
+ public static final String CONNECTIONS_PER_BROKER = "pulsar.connections.per.broker";
+
+ public static final String USE_TCP_NODELAY = "pulsar.use.tcp.nodelay";
+
+ public static final String CONCURRENT_LOOKUP_REQUESTS = "pulsar.concurrent.lookup.requests";
+ public static final String MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION = "pulsar.max.number.rejected.request.per.connection";
+
+ public static ClientBuilder getClientBuilder(Properties properties) {
+ ClientBuilder clientBuilder = PulsarClient.builder();
+
+ if (properties.containsKey(AUTHENTICATION_CLASS)) {
+ String className = properties.getProperty(AUTHENTICATION_CLASS);
+ try {
+ if (properties.containsKey(AUTHENTICATION_PARAMS_STRING)) {
+ String authParamsString = (String) properties.get(AUTHENTICATION_PARAMS_STRING);
+ clientBuilder.authentication(className, authParamsString);
+ } else if (properties.containsKey(AUTHENTICATION_PARAMS_MAP)) {
+ Map<String, String> authParams = (Map<String, String>) properties.get(AUTHENTICATION_PARAMS_MAP);
+ clientBuilder.authentication(className, authParams);
+ } else {
+ @SuppressWarnings("unchecked")
+ Class<Authentication> clazz = (Class<Authentication>) Class.forName(className);
+ Authentication auth = clazz.newInstance();
+ clientBuilder.authentication(auth);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ clientBuilder.enableTls(Boolean.parseBoolean(properties.getProperty(USE_TLS, "false")));
+ clientBuilder.allowTlsInsecureConnection(
+ Boolean.parseBoolean(properties.getProperty(TLS_ALLOW_INSECURE_CONNECTION, "false")));
+ clientBuilder.enableTlsHostnameVerification(
+ Boolean.parseBoolean(properties.getProperty(TLS_HOSTNAME_VERIFICATION, "false")));
+
+ if (properties.containsKey(TLS_TRUST_CERTS_FILE_PATH)) {
+ clientBuilder.tlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH));
+ }
+
+ if (properties.containsKey(OPERATION_TIMEOUT_MS)) {
+ clientBuilder.operationTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)),
+ TimeUnit.MILLISECONDS);
+ }
+
+ if (properties.containsKey(STATS_INTERVAL_SECONDS)) {
+ clientBuilder.statsInterval(Integer.parseInt(properties.getProperty(STATS_INTERVAL_SECONDS)),
+ TimeUnit.SECONDS);
+ }
+
+ if (properties.containsKey(NUM_IO_THREADS)) {
+ clientBuilder.ioThreads(Integer.parseInt(properties.getProperty(NUM_IO_THREADS)));
+ }
+
+ if (properties.containsKey(CONNECTIONS_PER_BROKER)) {
+ clientBuilder.connectionsPerBroker(Integer.parseInt(properties.getProperty(CONNECTIONS_PER_BROKER)));
+ }
+
+ if (properties.containsKey(USE_TCP_NODELAY)) {
+ clientBuilder.enableTcpNoDelay(Boolean.parseBoolean(properties.getProperty(USE_TCP_NODELAY)));
+ }
+
+ if (properties.containsKey(CONCURRENT_LOOKUP_REQUESTS)) {
+ clientBuilder.maxConcurrentLookupRequests(Integer.parseInt(properties.getProperty(CONCURRENT_LOOKUP_REQUESTS)));
+ }
+
+ if (properties.containsKey(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)) {
+ clientBuilder.maxNumberOfRejectedRequestPerConnection(
+ Integer.parseInt(properties.getProperty(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)));
+ }
+
+ return clientBuilder;
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
new file mode 100644
index 0000000..a527827
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+
+public class PulsarConsumerKafkaConfig {
+
+ /// Config variables
+ public static final String CONSUMER_NAME = "pulsar.consumer.name";
+ public static final String RECEIVER_QUEUE_SIZE = "pulsar.consumer.receiver.queue.size";
+ public static final String ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS = "pulsar.consumer.acknowledgments.group.time.millis";
+ public static final String TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS = "pulsar.consumer.total.receiver.queue.size.across.partitions";
+ public static final String SUBSCRIPTION_TOPICS_MODE = "pulsar.consumer.subscription.topics.mode";
+
+ public static ConsumerBuilder<byte[]> getConsumerBuilder(PulsarClient client, Properties properties) {
+ ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer();
+
+ if (properties.containsKey(CONSUMER_NAME)) {
+ consumerBuilder.consumerName(properties.getProperty(CONSUMER_NAME));
+ }
+
+ if (properties.containsKey(RECEIVER_QUEUE_SIZE)) {
+ consumerBuilder.receiverQueueSize(Integer.parseInt(properties.getProperty(RECEIVER_QUEUE_SIZE)));
+ }
+
+ if (properties.containsKey(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)) {
+ consumerBuilder.maxTotalReceiverQueueSizeAcrossPartitions(
+ Integer.parseInt(properties.getProperty(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)));
+ }
+
+ if (properties.containsKey(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)) {
+ consumerBuilder.acknowledgmentGroupTime(
+ Long.parseLong(properties.getProperty(ACKNOWLEDGEMENTS_GROUP_TIME_MILLIS)), TimeUnit.MILLISECONDS);
+ }
+
+ if (properties.containsKey(SUBSCRIPTION_TOPICS_MODE)) {
+ RegexSubscriptionMode mode;
+ try {
+ mode = RegexSubscriptionMode.valueOf(properties.getProperty(SUBSCRIPTION_TOPICS_MODE));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Illegal subscription mode, valid values are: "
+ + Arrays.asList(RegexSubscriptionMode.values()));
+ }
+ consumerBuilder.subscriptionTopicsMode(mode);
+ }
+
+ return consumerBuilder;
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
new file mode 100644
index 0000000..807f482
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarKafkaSchema.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class PulsarKafkaSchema<T> implements Schema<T> {
+
+ private final Serializer<T> kafkaSerializer;
+
+ private final Deserializer<T> kafkaDeserializer;
+
+ private String topic;
+
+ public PulsarKafkaSchema(Serializer<T> serializer) {
+ this(serializer, null);
+ }
+
+ public PulsarKafkaSchema(Deserializer<T> deserializer) {
+ this(null, deserializer);
+ }
+
+ public PulsarKafkaSchema(Serializer<T> serializer, Deserializer<T> deserializer) {
+ this.kafkaSerializer = serializer;
+ this.kafkaDeserializer = deserializer;
+ }
+
+ public Serializer<T> getKafkaSerializer() {
+ return kafkaSerializer;
+ }
+
+ public Deserializer<T> getKafkaDeserializer() {
+ return kafkaDeserializer;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ @Override
+ public byte[] encode(T message) {
+ checkArgument(kafkaSerializer != null, "Kafka serializer is not initialized yet");
+ return kafkaSerializer.serialize(this.topic, message);
+ }
+
+ @Override
+ public T decode(byte[] message) {
+ checkArgument(kafkaDeserializer != null, "Kafka deserializer is not initialized yet");
+ return kafkaDeserializer.deserialize(this.topic, message);
+ }
+
+ @Override
+ public SchemaInfo getSchemaInfo() {
+ return Schema.BYTES.getSchemaInfo();
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
new file mode 100644
index 0000000..5a9a651
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.kafka.compat;
+
+import java.util.Properties;
+
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+
+public class PulsarProducerKafkaConfig {
+
+ /// Config variables
+ public static final String PRODUCER_NAME = "pulsar.producer.name";
+ public static final String INITIAL_SEQUENCE_ID = "pulsar.producer.initial.sequence.id";
+
+ public static final String MAX_PENDING_MESSAGES = "pulsar.producer.max.pending.messages";
+ public static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = "pulsar.producer.max.pending.messages.across.partitions";
+ public static final String BATCHING_ENABLED = "pulsar.producer.batching.enabled";
+ public static final String BATCHING_MAX_MESSAGES = "pulsar.producer.batching.max.messages";
+
+ public static ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client, Properties properties) {
+ ProducerBuilder<byte[]> producerBuilder = client.newProducer();
+
+ if (properties.containsKey(PRODUCER_NAME)) {
+ producerBuilder.producerName(properties.getProperty(PRODUCER_NAME));
+ }
+
+ if (properties.containsKey(INITIAL_SEQUENCE_ID)) {
+ producerBuilder.initialSequenceId(Long.parseLong(properties.getProperty(INITIAL_SEQUENCE_ID)));
+ }
+
+ if (properties.containsKey(MAX_PENDING_MESSAGES)) {
+ producerBuilder.maxPendingMessages(Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES)));
+ }
+
+ if (properties.containsKey(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)) {
+ producerBuilder.maxPendingMessagesAcrossPartitions(
+ Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)));
+ }
+
+ producerBuilder.enableBatching(Boolean.parseBoolean(properties.getProperty(BATCHING_ENABLED, "true")));
+
+ if (properties.containsKey(BATCHING_MAX_MESSAGES)) {
+ producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
+ }
+
+ return producerBuilder;
+ }
+}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
new file mode 100644
index 0000000..4d12418
--- /dev/null
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.anyVararg;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
+import org.apache.avro.reflect.Nullable;
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.schema.SchemaDefinition;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
+import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.kafka.compat.PulsarClientKafkaConfig;
+import org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.testng.Assert;
+import org.testng.IObjectFactory;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+@PrepareForTest({PulsarClientKafkaConfig.class, PulsarProducerKafkaConfig.class})
+@PowerMockIgnore({"org.apache.logging.log4j.*", "org.apache.kafka.clients.producer.ProducerInterceptor"})
+public class PulsarKafkaProducerTest {
+
+ @Data
+ @ToString
+ @EqualsAndHashCode
+ public static class Foo {
+ @Nullable
+ private String field1;
+ @Nullable
+ private String field2;
+ private int field3;
+ }
+
+ @Data
+ @ToString
+ @EqualsAndHashCode
+ public static class Bar {
+ private boolean field1;
+ }
+
+ @ObjectFactory
+ // Necessary to make PowerMockito.mockStatic work with TestNG.
+ public IObjectFactory getObjectFactory() {
+ return new org.powermock.modules.testng.PowerMockObjectFactory();
+ }
+
+ @Test
+ public void testPulsarKafkaProducer() {
+ ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
+ ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
+ doAnswer(invocation -> {
+ Assert.assertEquals((int)invocation.getArguments()[0], 1000000, "Send time out is suppose to be 1000.");
+ return mockProducerBuilder;
+ }).when(mockProducerBuilder).sendTimeout(anyInt(), any(TimeUnit.class));
+ doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
+ doAnswer(invocation -> {
+ Assert.assertEquals((int)invocation.getArguments()[0], 1000, "Keep alive interval is suppose to be 1000.");
+ return mockClientBuilder;
+ }).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
+
+ PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
+ PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
+ when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
+ when(PulsarProducerKafkaConfig.getProducerBuilder(any(), any())).thenReturn(mockProducerBuilder);
+
+ Properties properties = new Properties();
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
+ properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
+ properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
+
+ new PulsarKafkaProducer<>(properties);
+
+ verify(mockClientBuilder, times(1)).keepAliveInterval(1000, TimeUnit.SECONDS);
+ verify(mockProducerBuilder, times(1)).sendTimeout(1000000, TimeUnit.MILLISECONDS);
+ }
+
+ @Test
+ public void testPulsarKafkaSendAvro() throws PulsarClientException {
+ // Arrange
+ PulsarClient mockClient = mock(PulsarClient.class);
+ ProducerBuilder mockProducerBuilder = mock(ProducerBuilder.class);
+ org.apache.pulsar.client.api.Producer mockProducer = mock(org.apache.pulsar.client.api.Producer.class);
+ ClientBuilder mockClientBuilder = mock(ClientBuilder.class);
+ CompletableFuture mockPartitionFuture = new CompletableFuture();
+ CompletableFuture mockSendAsyncFuture = new CompletableFuture();
+ TypedMessageBuilder mockTypedMessageBuilder = mock(TypedMessageBuilderImpl.class);
+
+ mockPartitionFuture.complete(new ArrayList<>());
+ mockSendAsyncFuture.complete(new MessageIdImpl(1, 1, 1));
+ doReturn(mockClientBuilder).when(mockClientBuilder).serviceUrl(anyString());
+ doReturn(mockClientBuilder).when(mockClientBuilder).keepAliveInterval(anyInt(), any(TimeUnit.class));
+ doReturn(mockClient).when(mockClientBuilder).build();
+ doReturn(mockPartitionFuture).when(mockClient).getPartitionsForTopic(anyString());
+ doReturn(mockProducerBuilder).when(mockProducerBuilder).topic(anyString());
+ doReturn(mockProducerBuilder).when(mockProducerBuilder).clone();
+ doReturn(mockProducer).when(mockProducerBuilder).create();
+ doReturn(mockTypedMessageBuilder).when(mockProducer).newMessage();
+ doReturn(mockSendAsyncFuture).when(mockTypedMessageBuilder).sendAsync();
+ PowerMockito.mockStatic(PulsarClientKafkaConfig.class);
+ PowerMockito.mockStatic(PulsarProducerKafkaConfig.class);
+ when(PulsarClientKafkaConfig.getClientBuilder(any(Properties.class))).thenReturn(mockClientBuilder);
+ when(PulsarProducerKafkaConfig.getProducerBuilder(any(PulsarClient.class), any(Properties.class))).thenReturn(mockProducerBuilder);
+
+ Properties properties = new Properties();
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
+ properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
+ properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
+
+ AvroSchema<Bar> barSchema = AvroSchema.of(SchemaDefinition.<Bar>builder().withPojo(Bar.class).build());
+ AvroSchema<Foo> fooSchema = AvroSchema.of(SchemaDefinition.<Foo>builder().withPojo(Foo.class).build());
+ // Act
+ PulsarKafkaProducer<Foo, Bar> pulsarKafkaProducer = new PulsarKafkaProducer<>(properties, fooSchema, barSchema);
+
+ Bar bar = new Bar();
+ bar.setField1(true);
+
+ Foo foo = new Foo();
+ foo.setField1("field1");
+ foo.setField2("field2");
+ foo.setField3(3);
+
+ pulsarKafkaProducer.send(new ProducerRecord<>("topic", 1, foo, bar));
+
+ // Verify
+ verify(mockTypedMessageBuilder).sendAsync();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = "Invalid value 2147483648000 for 'connections.max.idle.ms'. Please use a value smaller than 2147483647000 milliseconds.")
+ public void testPulsarKafkaProducerKeepAliveIntervalIllegalArgumentException() {
+ Properties properties = new Properties();
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
+ properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, Long.toString((Integer.MAX_VALUE + 1L) * 1000));
+
+ new PulsarKafkaProducer<>(properties);
+ }
+
+}