[Issue 4283][flink] construct auth when building pulsar source (#4284)

Fixes #4283

### Motivation

pulsar flink connector now uses ClientConfigData to instantiate pulsar client. Pulsar client composes Authentication which can not be serialized. In an environment with Auth there is no way to set auth in pulsar client.

### Modifications

Keep auth params away from persistence and serialization. Construct auth when building pulsar source
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index 29cfe0e..5afba67 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -22,7 +22,6 @@
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 import java.util.function.Function;
-import java.util.Map;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.functions.RuntimeContext;
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
index 4521add..f3227d9 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
@@ -27,6 +27,7 @@
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 
@@ -268,7 +269,7 @@
     }
 
 
-    public SourceFunction<T> build() {
+    public SourceFunction<T> build() throws PulsarClientException{
         Preconditions.checkArgument(StringUtils.isNotBlank(this.clientConfigurationData.getServiceUrl()),
                 "a service url is required");
         Preconditions.checkArgument((this.consumerConfigurationData.getTopicNames() != null &&
@@ -277,9 +278,27 @@
                 "At least one topic or topics pattern is required");
         Preconditions.checkArgument(StringUtils.isNotBlank(this.consumerConfigurationData.getSubscriptionName()),
                 "a subscription name is required");
+
+        setTransientFields();
+
         return new PulsarConsumerSource<>(this);
     }
 
+    private void setTransientFields() throws PulsarClientException {
+        setAuth();
+    }
+
+    private void setAuth() throws PulsarClientException{
+        if (StringUtils.isBlank(this.clientConfigurationData.getAuthPluginClassName())
+                && this.clientConfigurationData.getAuthParams() == null || this.clientConfigurationData.getAuthParams().isEmpty())
+            return;
+
+        clientConfigurationData.setAuthentication(
+                AuthenticationFactory.create(
+                        this.clientConfigurationData.getAuthPluginClassName(),
+                        this.clientConfigurationData.getAuthParams()));
+    }
+
     /**
      * Creates a PulsarSourceBuilder.
      *
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
index cc12ba4..e7fe78c 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
@@ -47,7 +48,7 @@
     }
 
     @Test
-    public void testBuild() {
+    public void testBuild() throws PulsarClientException {
         SourceFunction sourceFunction = pulsarSourceBuilder
                 .serviceUrl("testServiceUrl")
                 .topic("testTopic")
@@ -59,7 +60,7 @@
 
 
     @Test
-    public void testBuildWithConfPojo() {
+    public void testBuildWithConfPojo() throws PulsarClientException {
         ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
         ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
                 .topicNames(new HashSet<>(Arrays.asList("testTopic")))
@@ -74,7 +75,7 @@
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testBuildWithoutSettingRequiredProperties() {
+    public void testBuildWithoutSettingRequiredProperties() throws PulsarClientException {
         pulsarSourceBuilder.build();
     }
 
@@ -158,7 +159,7 @@
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testServiceUrlNullWithConfPojo() {
+    public void testServiceUrlNullWithConfPojo() throws PulsarClientException {
         ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl(null).build();
         ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
                 .topicNames(new HashSet<String>(Arrays.asList("testServiceUrl")))
@@ -172,7 +173,7 @@
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testServiceUrlWithBlankWithConfPojo() {
+    public void testServiceUrlWithBlankWithConfPojo() throws PulsarClientException {
         ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl(StringUtils.EMPTY).build();
         ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
                 .topicNames(new HashSet<String>(Arrays.asList("testTopic")))
@@ -186,7 +187,7 @@
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testTopicPatternWithNullWithConfPojo() {
+    public void testTopicPatternWithNullWithConfPojo() throws PulsarClientException {
         ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
         ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
                 .topicsPattern(null)
@@ -200,7 +201,7 @@
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testSubscriptionNameWithNullWithConfPojo() {
+    public void testSubscriptionNameWithNullWithConfPojo() throws PulsarClientException {
         ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
         ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
                 .topicNames(new HashSet<String>(Arrays.asList("testTopic")))
@@ -214,7 +215,7 @@
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testSubscriptionNameWithBlankWithConfPojo() {
+    public void testSubscriptionNameWithBlankWithConfPojo() throws PulsarClientException {
         pulsarSourceBuilder.topic(null);
         ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
         ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()
@@ -229,7 +230,7 @@
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testSubscriptionInitialPositionWithConfPojo() {
+    public void testSubscriptionInitialPositionWithConfPojo() throws PulsarClientException {
         pulsarSourceBuilder.topic(null);
         ClientConfigurationData clientConf = ClientConfigurationData.builder().serviceUrl("testServiceUrl").build();
         ConsumerConfigurationData consumerConf = ConsumerConfigurationData.builder()