[FLINK-24206][connector/pulsar] Close the pulsar client properly.

Pulsar Consumer would be closed in SplitReader if it reaches the end of split. The registered Consumer would also be closed in PulsarClient if we call PulsarClient.close(). This would cause race condition. We have to use PulsarClient.shutdown() which don't close Consumer instead.

This closes #17255
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java
index d340b9f..2792895 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/source/PulsarSourceReaderBase.java
@@ -79,10 +79,11 @@
 
     @Override
     public void close() throws Exception {
-        // Close shared pulsar resources.
-        pulsarClient.close();
-        pulsarAdmin.close();
-
+        // Close the all the consumers first.
         super.close();
+
+        // Close shared pulsar resources.
+        pulsarClient.shutdown();
+        pulsarAdmin.close();
     }
 }