[pulsar-flink]Cache Pulsar client to make it shared among tasks in a process (#5900)
* Cache Pulsar client to make it shared among tasks in a process
* code format & add tests
* fix style
Co-authored-by: Sijie Guo <guosijie@gmail.com>
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java
new file mode 100644
index 0000000..613d4cc
--- /dev/null
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Pulsar Client cache that enables client sharing among different flink tasks in same process.
+ */
+public class CachedPulsarClient {
+ private static final Logger LOG = LoggerFactory.getLogger(CachedPulsarClient.class);
+
+ private static int cacheSize = 5;
+
+ public static void setCacheSize(int size) {
+ cacheSize = size;
+ }
+
+ private static CacheLoader<ClientConfigurationData, PulsarClientImpl> cacheLoader =
+ new CacheLoader<ClientConfigurationData, PulsarClientImpl>() {
+ @Override
+ public PulsarClientImpl load(ClientConfigurationData key) throws Exception {
+ return createPulsarClient(key);
+ }
+ };
+
+ private static RemovalListener<ClientConfigurationData, PulsarClientImpl> removalListener = notification -> {
+ ClientConfigurationData config = notification.getKey();
+ PulsarClientImpl client = notification.getValue();
+ LOG.debug("Evicting pulsar client %s with config %s, due to %s",
+ client.toString(), config.toString(), notification.getCause().toString());
+ close(config, client);
+ };
+
+ private static LoadingCache<ClientConfigurationData, PulsarClientImpl> guavaCache =
+ CacheBuilder.newBuilder().maximumSize(cacheSize).removalListener(removalListener).build(cacheLoader);
+
+ private static PulsarClientImpl createPulsarClient(
+ ClientConfigurationData clientConfig) throws PulsarClientException {
+ PulsarClientImpl client;
+ try {
+ client = new PulsarClientImpl(clientConfig);
+ LOG.debug(String.format("Created a new instance of PulsarClientImpl for clientConf = %s",
+ clientConfig.toString()));
+ } catch (PulsarClientException e) {
+ LOG.error(String.format("Failed to create PulsarClientImpl for clientConf = %s",
+ clientConfig.toString()));
+ throw e;
+ }
+ return client;
+ }
+
+ public static PulsarClientImpl getOrCreate(ClientConfigurationData config) throws ExecutionException {
+ return guavaCache.get(config);
+ }
+
+ private static void close(ClientConfigurationData clientConfig, PulsarClientImpl client) {
+ try {
+ LOG.info(String.format("Closing the Pulsar client with conifg %s", clientConfig.toString()));
+ client.close();
+ } catch (PulsarClientException e) {
+ LOG.warn(String.format("Error while closing the Pulsar client ", clientConfig.toString()), e);
+ }
+ }
+
+ static void close(ClientConfigurationData clientConfig) {
+ guavaCache.invalidate(clientConfig);
+ }
+
+ static void clear() {
+ LOG.info("Cleaning up guava cache.");
+ guavaCache.invalidateAll();
+ }
+
+ static ConcurrentMap<ClientConfigurationData, PulsarClientImpl> getAsMap() {
+ return guavaCache.asMap();
+ }
+}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index 215dcfd..376439d 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -186,7 +186,7 @@
}
private Producer<byte[]> createProducer() throws Exception {
- PulsarClientImpl client = new PulsarClientImpl(clientConf);
+ PulsarClientImpl client = CachedPulsarClient.getOrCreate(clientConf);
return client.createProducerAsync(producerConf).get();
}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index 58ccccf..9606dfc 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -23,6 +23,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -91,7 +92,7 @@
isCheckpointingEnabled = ((StreamingRuntimeContext) context).isCheckpointingEnabled();
}
- client = createClient();
+ client = getClient();
consumer = createConsumer(client);
isRunning = true;
@@ -188,8 +189,8 @@
return isCheckpointingEnabled;
}
- PulsarClient createClient() throws PulsarClientException {
- return new PulsarClientImpl(clientConfigurationData);
+ PulsarClient getClient() throws ExecutionException {
+ return CachedPulsarClient.getOrCreate(clientConfigurationData);
}
Consumer<byte[]> createConsumer(PulsarClient client) throws PulsarClientException {
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
new file mode 100644
index 0000000..a41609f
--- /dev/null
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test of {@link CachedPulsarClient}.
+ */
+public class CachedPulsarClientTest {
+
+ private static final String SERVICE_URL = "pulsar://localhost:6650";
+
+ @BeforeTest
+ public void clearCache() {
+ CachedPulsarClient.clear();
+ }
+
+ @Test
+ public void testShouldReturnSameInstanceWithSameParam() throws Exception {
+ PulsarClientImpl impl1 = Mockito.mock(PulsarClientImpl.class);
+ PulsarClientImpl impl2 = Mockito.mock(PulsarClientImpl.class);
+
+ ClientConfigurationData conf1 = new ClientConfigurationData();
+ conf1.setServiceUrl(SERVICE_URL);
+
+ ClientConfigurationData conf2 = new ClientConfigurationData();
+ conf2.setServiceUrl(SERVICE_URL);
+
+ PowerMockito.whenNew(PulsarClientImpl.class)
+ .withArguments(conf1).thenReturn(impl1);
+ PowerMockito.whenNew(PulsarClientImpl.class)
+ .withArguments(conf2).thenReturn(impl2);
+
+ PulsarClientImpl client1 = CachedPulsarClient.getOrCreate(conf1);
+ PulsarClientImpl client2 = CachedPulsarClient.getOrCreate(conf2);
+ PulsarClientImpl client3 = CachedPulsarClient.getOrCreate(conf1);
+
+ assertEquals(client1, client2);
+ assertEquals(client1, client3);
+
+ assertEquals(CachedPulsarClient.getAsMap().size(), 1);
+ }
+
+ @Test
+ public void testShouldCloseTheCorrectClient() throws Exception {
+ PulsarClientImpl impl1 = Mockito.mock(PulsarClientImpl.class);
+ PulsarClientImpl impl2 = Mockito.mock(PulsarClientImpl.class);
+
+ ClientConfigurationData conf1 = new ClientConfigurationData();
+ conf1.setServiceUrl(SERVICE_URL);
+
+ ClientConfigurationData conf2 = new ClientConfigurationData();
+ conf2.setServiceUrl(SERVICE_URL);
+ conf2.setNumIoThreads(5);
+
+ PowerMockito.whenNew(PulsarClientImpl.class)
+ .withArguments(conf1).thenReturn(impl1);
+ PowerMockito.whenNew(PulsarClientImpl.class)
+ .withArguments(conf2).thenReturn(impl2);
+
+ PulsarClientImpl client1 = CachedPulsarClient.getOrCreate(conf1);
+ PulsarClientImpl client2 = CachedPulsarClient.getOrCreate(conf2);
+
+ assertNotEquals(client1, client2);
+
+ ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map1 = CachedPulsarClient.getAsMap();
+ assertEquals(map1.size(), 2);
+
+ CachedPulsarClient.close(conf2);
+
+ ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map2 = CachedPulsarClient.getAsMap();
+ assertEquals(map2.size(), 1);
+
+ assertEquals(map2.values().iterator().next(), client1);
+ }
+}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
index 63e2679..88fcadc 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java
@@ -308,7 +308,7 @@
}
@Override
- PulsarClient createClient() {
+ PulsarClient getClient() {
return mock(PulsarClient.class);
}