[pulsar-kafka] add auto update partition support to producer/consumer (#13)
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
index a527827..09a9806 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
@@ -21,7 +21,7 @@
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-
+import static org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig.AUTO_UPDATE_PARTITIONS;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
@@ -67,6 +67,9 @@
consumerBuilder.subscriptionTopicsMode(mode);
}
+ if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) {
+ consumerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS)));
+ }
return consumerBuilder;
}
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
index 7554faf..3315cd2 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
@@ -34,6 +34,7 @@
public static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = "pulsar.producer.max.pending.messages.across.partitions";
public static final String BATCHING_ENABLED = "pulsar.producer.batching.enabled";
public static final String BATCHING_MAX_MESSAGES = "pulsar.producer.batching.max.messages";
+ public static final String AUTO_UPDATE_PARTITIONS = "pulsar.auto.update.partitions";
/**
* send operations will immediately fail with {@link ProducerQueueIsFullError} when there is no space left in
* pending queue.
@@ -66,6 +67,9 @@
producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
}
+ if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) {
+ producerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS)));
+ }
return producerBuilder;
}
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index 7d6e146..48481a9 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_8/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -19,18 +19,14 @@
package org.apache.kafka.clients.producer;
import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.util.Base64;
import java.util.List;
-import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.SerializationUtils;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRouter;
@@ -73,6 +69,7 @@
public static String KAFKA_KEY_MAX_QUEUE_BUFFERING_MESSAGES = "queue.buffering.max.messages";
public static String KAFKA_KEY_MAX_BATCH_MESSAGES = "batch.num.messages";
public static String KAFKA_KEY_REQUEST_TIMEOUT_MS = "request.timeout.ms";
+ public static String AUTO_UPDATE_PARTITIONS = "pulsar.auto.update.partitions";
private final ConcurrentMap<String, org.apache.pulsar.client.api.Producer<byte[]>> producers = new ConcurrentHashMap<>();
@@ -134,7 +131,9 @@
if (properties.containsKey(KAFKA_KEY_REQUEST_TIMEOUT_MS)) {
pulsarProducerBuilder.sendTimeout(config.requestTimeoutMs(), TimeUnit.MILLISECONDS);
}
-
+ if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) {
+ pulsarProducerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS)));
+ }
pulsarProducerBuilder.blockIfQueueFull(blockIfQueueFull).compressionType(compressionType);
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
index a527827..09a9806 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java
@@ -21,7 +21,7 @@
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-
+import static org.apache.pulsar.client.kafka.compat.PulsarProducerKafkaConfig.AUTO_UPDATE_PARTITIONS;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
@@ -67,6 +67,9 @@
consumerBuilder.subscriptionTopicsMode(mode);
}
+ if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) {
+ consumerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS)));
+ }
return consumerBuilder;
}
}
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
index 5a9a651..509df03 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka_0_9/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
@@ -33,6 +33,7 @@
public static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = "pulsar.producer.max.pending.messages.across.partitions";
public static final String BATCHING_ENABLED = "pulsar.producer.batching.enabled";
public static final String BATCHING_MAX_MESSAGES = "pulsar.producer.batching.max.messages";
+ public static final String AUTO_UPDATE_PARTITIONS = "pulsar.auto.update.partitions";
public static ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client, Properties properties) {
ProducerBuilder<byte[]> producerBuilder = client.newProducer();
@@ -60,6 +61,9 @@
producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES)));
}
+ if (properties.containsKey(AUTO_UPDATE_PARTITIONS)) {
+ producerBuilder.autoUpdatePartitions(Boolean.parseBoolean(properties.getProperty(AUTO_UPDATE_PARTITIONS)));
+ }
return producerBuilder;
}
}