Make client keepalive interval configurable on pulsar-client-kafka (#5131)

diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
index c38d93d..8ba5ede 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java
@@ -48,6 +48,8 @@
     public static final String CONCURRENT_LOOKUP_REQUESTS = "pulsar.concurrent.lookup.requests";
     public static final String MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION = "pulsar.max.number.rejected.request.per.connection";
 
+    public static final String KEEPALIVE_INTERVAL_MS = "pulsar.keepalive.interval.ms";
+
     public static ClientBuilder getClientBuilder(Properties properties) {
         ClientBuilder clientBuilder = PulsarClient.builder();
 
@@ -112,6 +114,11 @@
                     Integer.parseInt(properties.getProperty(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)));
         }
 
+        if (properties.containsKey(KEEPALIVE_INTERVAL_MS)) {
+            clientBuilder.keepAliveInterval(Integer.parseInt(properties.getProperty(KEEPALIVE_INTERVAL_MS)),
+                    TimeUnit.MILLISECONDS);
+        }
+
         return clientBuilder;
     }
 }