issue#3939 : Allow client authentication from pulsar-flink package (#3949)

Problem:
========
pulsar-flink module (aka flink connector) internally uses pulsar-client. Though the pulsar client allows setting tokens in the client builder, the flink connector does not provide a way to pass authentication token to the pulsar client it uses internally.

Solution:
========
Accept authetication information as an input in pulsar-flink module. Pass this authentication information to pulsar-client.
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
index ca34327..644c8e9 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
@@ -27,6 +27,7 @@
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Authentication;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,14 +47,16 @@
 
     protected final String serviceUrl;
     protected final String topicName;
+    private final Authentication authentication;
     protected SerializationSchema<T> serializationSchema;
 
-    protected BasePulsarOutputFormat(final String serviceUrl, final String topicName) {
+    protected BasePulsarOutputFormat(final String serviceUrl, final String topicName, final Authentication authentication) {
         Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "serviceUrl cannot be blank.");
         Preconditions.checkArgument(StringUtils.isNotBlank(topicName),  "topicName cannot be blank.");
 
         this.serviceUrl = serviceUrl;
         this.topicName = topicName;
+        this.authentication = authentication;
 
         LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}", this.topicName);
     }
@@ -65,7 +68,7 @@
 
     @Override
     public void open(int taskNumber, int numTasks) throws IOException {
-        this.producer = getProducerInstance(serviceUrl, topicName);
+        this.producer = getProducerInstance(serviceUrl, topicName, authentication);
 
         this.failureCallback = cause -> {
             LOG.error("Error while sending record to Pulsar: " + cause.getMessage(), cause);
@@ -85,11 +88,12 @@
 
     }
 
-    private static Producer<byte[]> getProducerInstance(String serviceUrl, String topicName) throws PulsarClientException {
+    private static Producer<byte[]> getProducerInstance(String serviceUrl, String topicName, Authentication authentication)
+            throws PulsarClientException {
         if(producer == null){
             synchronized (PulsarOutputFormat.class) {
                 if(producer == null){
-                    producer = Preconditions.checkNotNull(createPulsarProducer(serviceUrl, topicName),
+                    producer = Preconditions.checkNotNull(createPulsarProducer(serviceUrl, topicName, authentication),
                             "Pulsar producer cannot be null.");
                 }
             }
@@ -97,9 +101,10 @@
         return producer;
     }
 
-    private static Producer<byte[]> createPulsarProducer(String serviceUrl, String topicName) throws PulsarClientException {
+    private static Producer<byte[]> createPulsarProducer(String serviceUrl, String topicName, Authentication authentication)
+            throws PulsarClientException {
         try {
-            PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
+            PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
             return client.newProducer().topic(topicName).create();
         } catch (PulsarClientException e) {
             LOG.error("Pulsar producer cannot be created.", e);
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
index d15dfe7..52484ef 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
@@ -20,6 +20,7 @@
 
 import org.apache.avro.specific.SpecificRecord;
 import org.apache.flink.batch.connectors.pulsar.serialization.AvroSerializationSchema;
+import org.apache.pulsar.client.api.Authentication;
 
 /**
  * Pulsar Avro Output Format to write Flink DataSets into a Pulsar topic in Avro format.
@@ -28,8 +29,8 @@
 
     private static final long serialVersionUID = -6794070714728773530L;
 
-    public PulsarAvroOutputFormat(String serviceUrl, String topicName) {
-        super(serviceUrl, topicName);
+    public PulsarAvroOutputFormat(String serviceUrl, String topicName, Authentication authentication) {
+        super(serviceUrl, topicName, authentication);
         this.serializationSchema = new AvroSerializationSchema();
     }
 
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
index adae9f7..d36a260 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.batch.connectors.pulsar.serialization.CsvSerializationSchema;
+import org.apache.pulsar.client.api.Authentication;
 
 /**
  * Pulsar Csv Output Format to write Flink DataSets into a Pulsar topic in Csv format.
@@ -28,8 +29,8 @@
 
     private static final long serialVersionUID = -4461671510903404196L;
 
-    public PulsarCsvOutputFormat(String serviceUrl, String topicName) {
-        super(serviceUrl, topicName);
+    public PulsarCsvOutputFormat(String serviceUrl, String topicName, Authentication authentication) {
+        super(serviceUrl, topicName, authentication);
         this.serializationSchema = new CsvSerializationSchema<>();
     }
 
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
index 3fe5baa..96d7a01 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
@@ -19,6 +19,7 @@
 package org.apache.flink.batch.connectors.pulsar;
 
 import org.apache.flink.batch.connectors.pulsar.serialization.JsonSerializationSchema;
+import org.apache.pulsar.client.api.Authentication;
 
 /**
  * Pulsar Json Output Format to write Flink DataSets into a Pulsar topic in Json format.
@@ -27,8 +28,8 @@
 
     private static final long serialVersionUID = 8499620770848461958L;
 
-    public PulsarJsonOutputFormat(String serviceUrl, String topicName) {
-        super(serviceUrl, topicName);
+    public PulsarJsonOutputFormat(String serviceUrl, String topicName, Authentication authentication) {
+        super(serviceUrl, topicName, authentication);
         this.serializationSchema = new JsonSerializationSchema();
     }
 }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
index 889970f..393faaf 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.util.Preconditions;
+import org.apache.pulsar.client.api.Authentication;
 
 /**
  * Pulsar Output Format to write Flink DataSets into a Pulsar topic in user-defined format.
@@ -28,8 +29,8 @@
 
     private static final long serialVersionUID = 2997027580167793000L;
 
-    public PulsarOutputFormat(String serviceUrl, String topicName, final SerializationSchema<T> serializationSchema) {
-        super(serviceUrl, topicName);
+    public PulsarOutputFormat(String serviceUrl, String topicName, Authentication authentication, final SerializationSchema<T> serializationSchema) {
+        super(serviceUrl, topicName, authentication);
         Preconditions.checkNotNull(serializationSchema, "serializationSchema cannot be null.");
         this.serializationSchema = serializationSchema;
     }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index c0d3905..55eb619 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -41,6 +41,7 @@
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,6 +66,12 @@
     protected final String defaultTopicName;
 
     /**
+     * Pulsar client will use this authentication information, if required.
+     */
+    private final Authentication authentication;
+
+
+    /**
      * (Serializable) SerializationSchema for turning objects used with Flink into.
      * byte[] for Pulsar.
      */
@@ -121,13 +128,15 @@
 
     public FlinkPulsarProducer(String serviceUrl,
                                String defaultTopicName,
+                               Authentication authentication,
                                SerializationSchema<IN> serializationSchema,
                                PulsarKeyExtractor<IN> keyExtractor) {
-        this(serviceUrl, defaultTopicName, serializationSchema, keyExtractor, null);
+        this(serviceUrl, defaultTopicName, authentication, serializationSchema, keyExtractor, null);
     }
 
     public FlinkPulsarProducer(String serviceUrl,
                                String defaultTopicName,
+                               Authentication authentication,
                                SerializationSchema<IN> serializationSchema,
                                PulsarKeyExtractor<IN> keyExtractor,
                                Map<String, Object> producerConfig) {
@@ -135,6 +144,7 @@
         checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank");
         this.serviceUrl = serviceUrl;
         this.defaultTopicName = defaultTopicName;
+        this.authentication = authentication;
         this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
         this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
         ClosureCleaner.ensureSerializable(serializationSchema);
@@ -190,7 +200,7 @@
     }
 
     private Producer<byte[]> createProducer() throws Exception {
-        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
+        PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
         ProducerBuilder<byte[]> producerBuilder = client.newProducer();
         if (producerConfig != null) {
             producerBuilder = producerBuilder.loadConf(producerConfig);
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
index b370345..20999fd 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
@@ -36,6 +36,7 @@
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.Authentication;
 
 /**
  * An append-only table sink to emit a streaming table as a Pulsar stream that serializes data in Avro format.
@@ -44,6 +45,7 @@
 
     protected final String serviceUrl;
     protected final String topic;
+    protected final Authentication authentication;
     protected final String routingKeyFieldName;
     protected SerializationSchema<Row> serializationSchema;
     protected String[] fieldNames;
@@ -56,16 +58,17 @@
      *
      * @param serviceUrl          pulsar service url
      * @param topic               topic in pulsar to which table is written
-     * @param producerConf        producer configuration
      * @param routingKeyFieldName routing key field name
      */
     public PulsarAvroTableSink(
             String serviceUrl,
             String topic,
+            Authentication authentication,
             String routingKeyFieldName,
             Class<? extends SpecificRecord> recordClazz) {
         this.serviceUrl = checkNotNull(serviceUrl, "Service url not set");
         this.topic = checkNotNull(topic, "Topic is null");
+        this.authentication = checkNotNull(authentication, "authentication is null, set new AuthenticationDisabled() instead");
         this.routingKeyFieldName = routingKeyFieldName;
         this.recordClazz = recordClazz;
     }
@@ -78,6 +81,7 @@
         return new FlinkPulsarProducer<Row>(
                 serviceUrl,
                 topic,
+                authentication,
                 serializationSchema,
                 keyExtractor);
     }
@@ -110,7 +114,7 @@
 
     @Override
     public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
-        PulsarAvroTableSink sink = new PulsarAvroTableSink(serviceUrl, topic, routingKeyFieldName, recordClazz);
+        PulsarAvroTableSink sink = new PulsarAvroTableSink(serviceUrl, topic, authentication, routingKeyFieldName, recordClazz);
 
         sink.fieldNames = checkNotNull(fieldNames, "Field names are null");
         sink.fieldTypes = checkNotNull(fieldTypes, "Field types are null");
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index 6479bf0..5af82bc 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -33,6 +33,7 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.Authentication;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,6 +59,7 @@
     private final int messageReceiveTimeoutMs = 100;
     private final String serviceUrl;
     private final Set<String> topicNames;
+    private final Authentication authentication;
     private final Pattern topicsPattern;
     private final String subscriptionName;
     private final DeserializationSchema<T> deserializer;
@@ -75,6 +77,7 @@
     PulsarConsumerSource(PulsarSourceBuilder<T> builder) {
         super(MessageId.class);
         this.serviceUrl = builder.serviceUrl;
+        this.authentication = builder.authentication;
         this.topicNames = builder.topicNames;
         this.topicsPattern = builder.topicsPattern;
         this.deserializer = builder.deserializationSchema;
@@ -191,6 +194,7 @@
     PulsarClient createClient() throws PulsarClientException {
         return PulsarClient.builder()
             .serviceUrl(serviceUrl)
+            .authentication(authentication)
             .build();
     }
 
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
index 45c2642..c37250d 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
@@ -22,6 +22,7 @@
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.formats.json.JsonRowSerializationSchema;
 import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.Authentication;
 
 /**
  * Base class for {@link PulsarTableSink} that serializes data in JSON format.
@@ -33,14 +34,15 @@
      *
      * @param serviceUrl          pulsar service url
      * @param topic               topic in pulsar to which table is written
-     * @param producerConf        producer configuration
+     * @param authentication      authetication info required by pulsar client
      * @param routingKeyFieldName routing key field name
      */
     public PulsarJsonTableSink(
             String serviceUrl,
             String topic,
+            Authentication authentication,
             String routingKeyFieldName) {
-        super(serviceUrl, topic, routingKeyFieldName);
+        super(serviceUrl, topic, authentication, routingKeyFieldName);
     }
 
     @Override
@@ -53,6 +55,7 @@
         return new PulsarJsonTableSink(
                 serviceUrl,
                 topic,
+                authentication,
                 routingKeyFieldName);
     }
 }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
index 3f30390..3b78495 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
@@ -23,12 +23,16 @@
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.util.Preconditions;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.PulsarClientException;
 
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.regex.Pattern;
+import java.util.Map;
 
 /**
  * A class for building a pulsar source.
@@ -43,6 +47,7 @@
     final DeserializationSchema<T> deserializationSchema;
     String serviceUrl = SERVICE_URL;
     final Set<String> topicNames = new TreeSet<>();
+    Authentication authentication;
     Pattern topicsPattern;
     String subscriptionName = "flink-sub";
     long acknowledgementBatchSize = ACKNOWLEDGEMENT_BATCH_SIZE;
@@ -163,6 +168,62 @@
         throw new IllegalArgumentException("acknowledgementBatchSize can only take values > 0 and <= " + MAX_ACKNOWLEDGEMENT_BATCH_SIZE);
     }
 
+    /**
+     * Set the authentication provider to use in the Pulsar client instance.
+     *
+     * @param authentication an instance of the {@link Authentication} provider already constructed
+     * @return this builder
+     */
+    public PulsarSourceBuilder<T> authentication(Authentication authentication) {
+        Preconditions.checkArgument(authentication != null,
+                "authentication instance can not be null, use new AuthenticationDisabled() to disable authentication");
+        this.authentication = authentication;
+        return this;
+    }
+
+    /**
+     * Configure the authentication provider to use in the Pulsar client instance
+     *
+     * @param authPluginClassName
+     *            name of the Authentication-Plugin to use
+     * @param authParamsString
+     *            string which represents parameters for the Authentication-Plugin, e.g., "key1:val1,key2:val2"
+     * @return this builder
+     * @throws PulsarClientException.UnsupportedAuthenticationException
+     *             failed to instantiate specified Authentication-Plugin
+     */
+    public PulsarSourceBuilder<T> authentication(String authPluginClassName, String authParamsString)
+            throws PulsarClientException.UnsupportedAuthenticationException {
+        Preconditions.checkArgument(StringUtils.isNotBlank(authPluginClassName),
+                "Authentication-Plugin class name can not be blank");
+        Preconditions.checkArgument(StringUtils.isNotBlank(authParamsString),
+                "Authentication-Plugin parameters can not be blank");
+        this.authentication = AuthenticationFactory.create(authPluginClassName, authParamsString);
+        return this;
+    }
+
+    /**
+     * Configure the authentication provider to use in the Pulsar client instance
+     * using a config map.
+     *
+     * @param authPluginClassName
+     *            name of the Authentication-Plugin you want to use
+     * @param authParams
+     *            map which represents parameters for the Authentication-Plugin
+     * @return this builder
+     * @throws PulsarClientException.UnsupportedAuthenticationException
+     *             failed to instantiate specified Authentication-Plugin
+     */
+    public PulsarSourceBuilder<T> authentication(String authPluginClassName, Map<String, String> authParams)
+            throws PulsarClientException.UnsupportedAuthenticationException {
+        Preconditions.checkArgument(StringUtils.isNotBlank(authPluginClassName),
+                "Authentication-Plugin class name can not be blank");
+        Preconditions.checkArgument((authParams != null && authParams.isEmpty() == false),
+                "parameters to authentication plugin can not be null/empty");
+        this.authentication = AuthenticationFactory.create(authPluginClassName, authParams);
+        return this;
+    }
+
     public SourceFunction<T> build() {
         Preconditions.checkNotNull(serviceUrl, "a service url is required");
         Preconditions.checkArgument((topicNames != null && !topicNames.isEmpty()) || topicsPattern != null,
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
index 0fc45f7..5d20a1d 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
@@ -33,6 +33,7 @@
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.Authentication;
 
 /**
  * An append-only table sink to emit a streaming table as a Pulsar stream.
@@ -41,6 +42,7 @@
 
     protected final String serviceUrl;
     protected final String topic;
+    protected Authentication authentication;
     protected SerializationSchema<Row> serializationSchema;
     protected PulsarKeyExtractor<Row> keyExtractor;
     protected String[] fieldNames;
@@ -50,9 +52,11 @@
     public PulsarTableSink(
             String serviceUrl,
             String topic,
+            Authentication authentication,
             String routingKeyFieldName) {
         this.serviceUrl = checkNotNull(serviceUrl, "Service url not set");
         this.topic = checkNotNull(topic, "Topic is null");
+        this.authentication = checkNotNull(authentication, "authentication is null, set new AuthenticationDisabled() instead");
         this.routingKeyFieldName = routingKeyFieldName;
     }
 
@@ -78,6 +82,7 @@
         return new FlinkPulsarProducer<Row>(
                 serviceUrl,
                 topic,
+                authentication,
                 serializationSchema,
                 keyExtractor);
     }
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
index 62c3b5d..bedcbda 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flink.batch.connectors.pulsar;
 
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertNotNull;
@@ -30,28 +31,28 @@
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarAvroOutputFormatConstructorWhenServiceUrlIsNull() {
-        new PulsarAvroOutputFormat(null, "testTopic");
+        new PulsarAvroOutputFormat(null, "testTopic", new AuthenticationDisabled());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarAvroOutputFormatConstructorWhenTopicNameIsNull() {
-        new PulsarAvroOutputFormat("testServiceUrl", null);
+        new PulsarAvroOutputFormat("testServiceUrl", null, new AuthenticationDisabled());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarAvroOutputFormatConstructorWhenTopicNameIsBlank() {
-        new PulsarAvroOutputFormat("testServiceUrl", " ");
+        new PulsarAvroOutputFormat("testServiceUrl", " ", new AuthenticationDisabled());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarAvroOutputFormatConstructorWhenServiceUrlIsBlank() {
-        new PulsarAvroOutputFormat(" ", "testTopic");
+        new PulsarAvroOutputFormat(" ", "testTopic", new AuthenticationDisabled());
     }
 
     @Test
     public void testPulsarAvroOutputFormatConstructor() {
         PulsarAvroOutputFormat pulsarAvroOutputFormat =
-                new PulsarAvroOutputFormat("testServiceUrl", "testTopic");
+                new PulsarAvroOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled());
         assertNotNull(pulsarAvroOutputFormat);
     }
 }
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
index a564a89..caccb6b 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flink.batch.connectors.pulsar;
 
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertNotNull;
@@ -29,28 +30,28 @@
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarCsvOutputFormatConstructorWhenServiceUrlIsNull() {
-        new PulsarCsvOutputFormat(null, "testTopic");
+        new PulsarCsvOutputFormat(null, "testTopic", new AuthenticationDisabled());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarCsvOutputFormatConstructorWhenTopicNameIsNull() {
-        new PulsarCsvOutputFormat("testServiceUrl", null);
+        new PulsarCsvOutputFormat("testServiceUrl", null, new AuthenticationDisabled());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarCsvOutputFormatConstructorWhenTopicNameIsBlank() {
-        new PulsarCsvOutputFormat("testServiceUrl", " ");
+        new PulsarCsvOutputFormat("testServiceUrl", " ", new AuthenticationDisabled());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarCsvOutputFormatConstructorWhenServiceUrlIsBlank() {
-        new PulsarCsvOutputFormat(" ", "testTopic");
+        new PulsarCsvOutputFormat(" ", "testTopic", new AuthenticationDisabled());
     }
 
     @Test
     public void testPulsarCsvOutputFormatConstructor() {
         PulsarCsvOutputFormat pulsarCsvOutputFormat =
-                new PulsarCsvOutputFormat("testServiceUrl", "testTopic");
+                new PulsarCsvOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled());
         assertNotNull(pulsarCsvOutputFormat);
     }
 }
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
index b9953cf..4ab7232 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.flink.batch.connectors.pulsar;
 
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertNotNull;
@@ -29,28 +30,28 @@
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarJsonOutputFormatConstructorWhenServiceUrlIsNull() {
-        new PulsarJsonOutputFormat(null, "testTopic");
+        new PulsarJsonOutputFormat(null, "testTopic", new AuthenticationDisabled());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarJsonOutputFormatConstructorWhenTopicNameIsNull() {
-        new PulsarJsonOutputFormat("testServiceUrl", null);
+        new PulsarJsonOutputFormat("testServiceUrl", null, new AuthenticationDisabled());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarJsonOutputFormatConstructorWhenTopicNameIsBlank() {
-        new PulsarJsonOutputFormat("testServiceUrl", " ");
+        new PulsarJsonOutputFormat("testServiceUrl", " ", new AuthenticationDisabled());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarJsonOutputFormatConstructorWhenServiceUrlIsBlank() {
-        new PulsarJsonOutputFormat(" ", "testTopic");
+        new PulsarJsonOutputFormat(" ", "testTopic", new AuthenticationDisabled());
     }
 
     @Test
     public void testPulsarJsonOutputFormatConstructor() {
         PulsarJsonOutputFormat pulsarJsonOutputFormat =
-                new PulsarJsonOutputFormat("testServiceUrl", "testTopic");
+                new PulsarJsonOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled());
         assertNotNull(pulsarJsonOutputFormat);
     }
 }
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
index 41cf8b2..238c49b 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
@@ -20,6 +20,7 @@
 
 import org.apache.commons.io.IOUtils;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.testng.annotations.Test;
 
 import java.io.IOException;
@@ -35,34 +36,34 @@
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarOutputFormatConstructorWhenServiceUrlIsNull() {
-        new PulsarOutputFormat(null, "testTopic", text -> text.toString().getBytes());
+        new PulsarOutputFormat(null, "testTopic", new AuthenticationDisabled(), text -> text.toString().getBytes());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarOutputFormatConstructorWhenTopicNameIsNull() {
-        new PulsarOutputFormat("testServiceUrl", null, text -> text.toString().getBytes());
+        new PulsarOutputFormat("testServiceUrl", null, new AuthenticationDisabled(), text -> text.toString().getBytes());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarOutputFormatConstructorWhenTopicNameIsBlank() {
-        new PulsarOutputFormat("testServiceUrl", " ", text -> text.toString().getBytes());
+        new PulsarOutputFormat("testServiceUrl", " ", new AuthenticationDisabled(), text -> text.toString().getBytes());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testPulsarOutputFormatConstructorWhenServiceUrlIsBlank() {
-        new PulsarOutputFormat(" ", "testTopic", text -> text.toString().getBytes());
+        new PulsarOutputFormat(" ", "testTopic", new AuthenticationDisabled(), text -> text.toString().getBytes());
     }
 
     @Test(expectedExceptions = NullPointerException.class)
     public void testPulsarOutputFormatConstructorWhenSerializationSchemaIsNull() {
-        new PulsarOutputFormat("testServiceUrl", "testTopic", null);
+        new PulsarOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled(), null);
     }
 
     @Test
     public void testPulsarOutputFormatWithStringSerializationSchema() throws IOException {
         String input = "Wolfgang Amadeus Mozart";
         PulsarOutputFormat pulsarOutputFormat =
-                new PulsarOutputFormat("testServiceUrl", "testTopic",
+                new PulsarOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled(),
                         text -> text.toString().getBytes());
         assertNotNull(pulsarOutputFormat);
         byte[] bytes = pulsarOutputFormat.serializationSchema.serialize(input);
@@ -74,7 +75,7 @@
     public void testPulsarOutputFormatWithCustomSerializationSchema() throws IOException {
         Employee employee = new Employee(1, "Test Employee", "Test Department");
         PulsarOutputFormat pulsarOutputFormat =
-                new PulsarOutputFormat("testServiceUrl", "testTopic",
+                new PulsarOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled(),
                         new EmployeeSerializationSchema());
         assertNotNull(pulsarOutputFormat);
 
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
index 85bb2ed..125ee4a 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
@@ -25,6 +25,8 @@
 import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.mockito.Mockito;
 import org.mockito.internal.util.reflection.Whitebox;
 import org.powermock.api.mockito.PowerMockito;
@@ -39,6 +41,7 @@
 public class PulsarAvroTableSinkTest {
     private static final String SERVICE_URL = "pulsar://localhost:6650";
     private static final String TOPIC_NAME = "test_topic";
+    private static final Authentication AUTHENTICATION = new AuthenticationDisabled();
     private static final String ROUTING_KEY = "name";
 
     private final String[] fieldNames = {"id", "name","start_year","end_year"};
@@ -86,13 +89,14 @@
 
     private PulsarAvroTableSink spySink() throws Exception {
 
-        PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL, TOPIC_NAME, ROUTING_KEY, NasaMission.class);
+        PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL, TOPIC_NAME, AUTHENTICATION, ROUTING_KEY, NasaMission.class);
         FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class);
         PowerMockito.whenNew(
                 FlinkPulsarProducer.class
         ).withArguments(
                 Mockito.anyString(),
                 Mockito.anyString(),
+                Mockito.any(Authentication.class),
                 Mockito.any(SerializationSchema.class),
                 Mockito.any(PulsarKeyExtractor.class)
         ).thenReturn(producer);
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
index 9ceefff..c42ae6c 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
@@ -24,6 +24,8 @@
 import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
 import org.mockito.Mockito;
 import org.mockito.internal.util.reflection.Whitebox;
 import org.powermock.api.mockito.PowerMockito;
@@ -39,6 +41,7 @@
 
     private static final String SERVICE_URL = "pulsar://localhost:6650";
     private static final String TOPIC_NAME = "test_topic";
+    private static final Authentication AUTHENTICATION = new AuthenticationDisabled();
     private static final String ROUTING_KEY = "key";
     private final String[] fieldNames = {"key", "value"};
     private final TypeInformation[] typeInformations = {
@@ -80,13 +83,14 @@
     }
 
     private PulsarJsonTableSink spySink() throws Exception {
-        PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL, TOPIC_NAME, ROUTING_KEY);
+        PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL, TOPIC_NAME, AUTHENTICATION, ROUTING_KEY);
         FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class);
         PowerMockito.whenNew(
                 FlinkPulsarProducer.class
         ).withArguments(
                 Mockito.anyString(),
                 Mockito.anyString(),
+                Mockito.any(Authentication.class),
                 Mockito.any(SerializationSchema.class),
                 Mockito.any(PulsarKeyExtractor.class)
         ).thenReturn(producer);