[pulsar-kafka] add auto update partition support to producer/consumer (#13)

diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
index a527827..09a9806 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
@@ -21,7 +21,7 @@
 import java.util.Arrays;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
-
+import static org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig.AUTO_UPDATE_PARTITIONS;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
@@ -67,6 +67,9 @@
             consumerBuilder.subscriptionTopicsMode(mode);
         }
 
+        if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) {
+            consumerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS)));
+        }
         return consumerBuilder;
     }
 }
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
index 7554faf..3315cd2 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
@@ -34,6 +34,7 @@
     public static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = "pulsar.producer.max.pending.messages.across.partitions";
     public static final String BATCHING_ENABLED = "pulsar.producer.batching.enabled";
     public static final String BATCHING_MAX_MESSAGES = "pulsar.producer.batching.max.messages";
+    public static final String AUTO_UPDATE_PARTITIONS = "pulsar.auto.update.partitions";
     /**
      * send operations will immediately fail with {@link ProducerQueueIsFullError} when there is no space left in
      * pending queue.
@@ -66,6 +67,9 @@
             producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
         }
 
+        if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) {
+            producerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS)));
+        }
         return producerBuilder;
     }
 }
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 7d6e146..48481a9 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -19,18 +19,14 @@
 package org.apache.kafka.clients.producer;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.Serializable;
 import java.lang.reflect.Constructor;
 import java.util.Base64;
 import java.util.List;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.lang3.SerializationUtils;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageRouter;
@@ -73,6 +69,7 @@
     public static String KAFKA_KEY_MAX_QUEUE_BUFFERING_MESSAGES = "queue.buffering.max.messages";
     public static String KAFKA_KEY_MAX_BATCH_MESSAGES = "batch.num.messages";
     public static String KAFKA_KEY_REQUEST_TIMEOUT_MS = "request.timeout.ms";
+    public static String AUTO_UPDATE_PARTITIONS = "pulsar.auto.update.partitions";
 
     private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer<byte[]>> producers = new ConcurrentHashMap<>();
 
@@ -134,7 +131,9 @@
         if (properties.containsKey(KAFKA_KEY_REQUEST_TIMEOUT_MS)) {
             pulsarProducerBuilder.sendTimeout(config.requestTimeoutMs(), TimeUnit.MILLISECONDS);
         }
-
+        if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) {
+            pulsarProducerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS)));
+        }
         pulsarProducerBuilder.blockIfQueueFull(blockIfQueueFull).compressionType(compressionType);
 
     }
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
index a527827..09a9806 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
@@ -21,7 +21,7 @@
 import java.util.Arrays;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
-
+import static org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig.AUTO_UPDATE_PARTITIONS;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
@@ -67,6 +67,9 @@
             consumerBuilder.subscriptionTopicsMode(mode);
         }
 
+        if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) {
+            consumerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS)));
+        }
         return consumerBuilder;
     }
 }
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
index 5a9a651..509df03 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
@@ -33,6 +33,7 @@
     public static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = "pulsar.producer.max.pending.messages.across.partitions";
     public static final String BATCHING_ENABLED = "pulsar.producer.batching.enabled";
     public static final String BATCHING_MAX_MESSAGES = "pulsar.producer.batching.max.messages";
+    public static final String AUTO_UPDATE_PARTITIONS = "pulsar.auto.update.partitions";
 
     public static ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client, Properties properties) {
         ProducerBuilder<byte[]> producerBuilder = client.newProducer();
@@ -60,6 +61,9 @@
             producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
         }
 
+        if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) {
+            producerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS)));
+        }
         return producerBuilder;
     }
 }