[Flink-Connector]Get PulsarClient from cache should always return an open instance (#6436)


diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java
index 5d5715e..4de5145 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClient.java
@@ -77,7 +77,13 @@
     }
 
     public static PulsarClientImpl getOrCreate(ClientConfigurationData config) throws ExecutionException {
-        return guavaCache.get(config);
+        PulsarClientImpl instance = guavaCache.get(config);
+        if (instance.getState().get() == PulsarClientImpl.State.Open) {
+            return instance;
+        } else {
+            guavaCache.invalidate(config);
+            return guavaCache.get(config);
+        }
     }
 
     private static void close(ClientConfigurationData clientConfig, PulsarClientImpl client) {
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
index a41609f..39cdca1 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/CachedPulsarClientTest.java
@@ -100,4 +100,29 @@
 
         assertEquals(map2.values().iterator().next(), client1);
     }
+
+    @Test
+    public void getClientFromCacheShouldAlwaysReturnAnOpenedInstance() throws Exception {
+        PulsarClientImpl impl1 = Mockito.mock(PulsarClientImpl.class);
+
+        ClientConfigurationData conf1 = new ClientConfigurationData();
+        conf1.setServiceUrl(SERVICE_URL);
+
+        PowerMockito.whenNew(PulsarClientImpl.class)
+                .withArguments(conf1).thenReturn(impl1);
+
+        PulsarClientImpl client1 = CachedPulsarClient.getOrCreate(conf1);
+
+        ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map1 = CachedPulsarClient.getAsMap();
+        assertEquals(map1.size(), 1);
+
+        client1.getState().set(PulsarClientImpl.State.Closed);
+
+        PulsarClientImpl client2 = CachedPulsarClient.getOrCreate(conf1);
+
+        assertNotEquals(client1, client2);
+
+        ConcurrentMap<ClientConfigurationData, PulsarClientImpl> map2 = CachedPulsarClient.getAsMap();
+        assertEquals(map2.size(), 1);
+    }
 }