[pulsar-flink]Cache Pulsar client to make it shared among tasks in a process (#5900)

* Cache Pulsar client to make it shared among tasks in a process

* code format & add tests

* fix style

Co-authored-by: Sijie Guo <guosijie@gmail.com>
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java
new file mode 100644
index 0000000..613d4cc
--- /dev/null
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Pulsar Client cache that enables client sharing among different flink tasks in same process.
+ */
+public class CachedPulsarClient {
+    private static final Logger LOG = LoggerFactory.getLogger(CachedPulsarClient.class);
+
+    private static int cacheSize = 5;
+
+    public static void setCacheSize(int size) {
+        cacheSize = size;
+    }
+
+    private static CacheLoader<ClientConfigurationData, PulsarClientImpl> cacheLoader =
+        new CacheLoader<ClientConfigurationData, PulsarClientImpl>() {
+            @Override
+            public PulsarClientImpl load(ClientConfigurationData key) throws Exception {
+                return createPulsarClient(key);
+            }
+        };
+
+    private static RemovalListener<ClientConfigurationData, PulsarClientImpl> removalListener = notification -> {
+        ClientConfigurationData config = notification.getKey();
+        PulsarClientImpl client = notification.getValue();
+        LOG.debug("Evicting pulsar client %s with config %s, due to %s",
+            client.toString(), config.toString(), notification.getCause().toString());
+        close(config, client);
+    };
+
+    private static LoadingCache<ClientConfigurationData, PulsarClientImpl> guavaCache =
+        CacheBuilder.newBuilder().maximumSize(cacheSize).removalListener(removalListener).build(cacheLoader);
+
+    private static PulsarClientImpl createPulsarClient(
+            ClientConfigurationData clientConfig) throws PulsarClientException {
+        PulsarClientImpl client;
+        try {
+            client = new PulsarClientImpl(clientConfig);
+            LOG.debug(String.format("Created a new instance of PulsarClientImpl for clientConf = %s",
+                clientConfig.toString()));
+        } catch (PulsarClientException e) {
+            LOG.error(String.format("Failed to create PulsarClientImpl for clientConf = %s",
+                clientConfig.toString()));
+            throw e;
+        }
+        return client;
+    }
+
+    public static PulsarClientImpl getOrCreate(ClientConfigurationData config) throws ExecutionException {
+        return guavaCache.get(config);
+    }
+
+    private static void close(ClientConfigurationData clientConfig, PulsarClientImpl client) {
+        try {
+            LOG.info(String.format("Closing the Pulsar client with conifg %s", clientConfig.toString()));
+            client.close();
+        } catch (PulsarClientException e) {
+            LOG.warn(String.format("Error while closing the Pulsar client ", clientConfig.toString()), e);
+        }
+    }
+
+    static void close(ClientConfigurationData clientConfig) {
+        guavaCache.invalidate(clientConfig);
+    }
+
+    static void clear() {
+        LOG.info("Cleaning up guava cache.");
+        guavaCache.invalidateAll();
+    }
+
+    static ConcurrentMap<ClientConfigurationData, PulsarClientImpl> getAsMap() {
+        return guavaCache.asMap();
+    }
+}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index 215dcfd..376439d 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -186,7 +186,7 @@
     }
 
     private Producer<byte[]> createProducer() throws Exception {
-        PulsarClientImpl client = new PulsarClientImpl(clientConf);
+        PulsarClientImpl client = CachedPulsarClient.getOrCreate(clientConf);
         return client.createProducerAsync(producerConf).get();
     }
 
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index 58ccccf..9606dfc 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -23,6 +23,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -91,7 +92,7 @@
             isCheckpointingEnabled = ((StreamingRuntimeContext) context).isCheckpointingEnabled();
         }
 
-        client = createClient();
+        client = getClient();
         consumer = createConsumer(client);
 
         isRunning = true;
@@ -188,8 +189,8 @@
         return isCheckpointingEnabled;
     }
 
-    PulsarClient createClient() throws PulsarClientException {
-        return new PulsarClientImpl(clientConfigurationData);
+    PulsarClient getClient() throws ExecutionException {
+        return CachedPulsarClient.getOrCreate(clientConfigurationData);
     }
 
     Consumer<byte[]> createConsumer(PulsarClient client) throws PulsarClientException {
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
new file mode 100644
index 0000000..a41609f
--- /dev/null
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test of {@link CachedPulsarClient}.
+ */
+public class CachedPulsarClientTest {
+
+    private static final String SERVICE_URL = "pulsar://localhost:6650";
+
+    @BeforeTest
+    public void clearCache() {
+        CachedPulsarClient.clear();
+    }
+
+    @Test
+    public void testShouldReturnSameInstanceWithSameParam() throws Exception {
+        PulsarClientImpl impl1 = Mockito.mock(PulsarClientImpl.class);
+        PulsarClientImpl impl2 = Mockito.mock(PulsarClientImpl.class);
+
+        ClientConfigurationData conf1 = new ClientConfigurationData();
+        conf1.setServiceUrl(SERVICE_URL);
+
+        ClientConfigurationData conf2 = new ClientConfigurationData();
+        conf2.setServiceUrl(SERVICE_URL);
+
+        PowerMockito.whenNew(PulsarClientImpl.class)
+            .withArguments(conf1).thenReturn(impl1);
+        PowerMockito.whenNew(PulsarClientImpl.class)
+            .withArguments(conf2).thenReturn(impl2);
+
+        PulsarClientImpl client1 = CachedPulsarClient.getOrCreate(conf1);
+        PulsarClientImpl client2 = CachedPulsarClient.getOrCreate(conf2);
+        PulsarClientImpl client3 = CachedPulsarClient.getOrCreate(conf1);
+
+        assertEquals(client1, client2);
+        assertEquals(client1, client3);
+
+        assertEquals(CachedPulsarClient.getAsMap().size(), 1);
+    }
+
+    @Test
+    public void testShouldCloseTheCorrectClient() throws Exception {
+        PulsarClientImpl impl1 = Mockito.mock(PulsarClientImpl.class);
+        PulsarClientImpl impl2 = Mockito.mock(PulsarClientImpl.class);
+
+        ClientConfigurationData conf1 = new ClientConfigurationData();
+        conf1.setServiceUrl(SERVICE_URL);
+
+        ClientConfigurationData conf2 = new ClientConfigurationData();
+        conf2.setServiceUrl(SERVICE_URL);
+        conf2.setNumIoThreads(5);
+
+        PowerMockito.whenNew(PulsarClientImpl.class)
+            .withArguments(conf1).thenReturn(impl1);
+        PowerMockito.whenNew(PulsarClientImpl.class)
+            .withArguments(conf2).thenReturn(impl2);
+
+        PulsarClientImpl client1 = CachedPulsarClient.getOrCreate(conf1);
+        PulsarClientImpl client2 = CachedPulsarClient.getOrCreate(conf2);
+
+        assertNotEquals(client1, client2);
+
+        ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map1 = CachedPulsarClient.getAsMap();
+        assertEquals(map1.size(), 2);
+
+        CachedPulsarClient.close(conf2);
+
+        ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map2 = CachedPulsarClient.getAsMap();
+        assertEquals(map2.size(), 1);
+
+        assertEquals(map2.values().iterator().next(), client1);
+    }
+}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
index 63e2679..88fcadc 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
@@ -308,7 +308,7 @@
         }
 
         @Override
-        PulsarClient createClient() {
+        PulsarClient getClient() {
             return mock(PulsarClient.class);
         }