Make client keepalive interval configurable on pulsar-client-kafka (#5131)
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
index c38d93d..8ba5ede 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
@@ -48,6 +48,8 @@
public static final String CONCURRENT_LOOKUP_REQUESTS = "pulsar.concurrent.lookup.requests";
public static final String MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION = "pulsar.max.number.rejected.request.per.connection";
+ public static final String KEEPALIVE_INTERVAL_MS = "pulsar.keepalive.interval.ms";
+
public static ClientBuilder getClientBuilder(Properties properties) {
ClientBuilder clientBuilder = PulsarClient.builder();
@@ -112,6 +114,11 @@
Integer.parseInt(properties.getProperty(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)));
}
+ if (properties.containsKey(KEEPALIVE_INTERVAL_MS)) {
+ clientBuilder.keepAliveInterval(Integer.parseInt(properties.getProperty(KEEPALIVE_INTERVAL_MS)),
+ TimeUnit.MILLISECONDS);
+ }
+
return clientBuilder;
}
}