issue#3939 : Allow client authentication from pulsar-flink package (#3949)
Problem:
========
pulsar-flink module (aka flink connector) internally uses pulsar-client. Though the pulsar client allows setting tokens in the client builder, the flink connector does not provide a way to pass authentication token to the pulsar client it uses internally.
Solution:
========
Accept authetication information as an input in pulsar-flink module. Pass this authentication information to pulsar-client.
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
index ca34327..644c8e9 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/BasePulsarOutputFormat.java
@@ -27,6 +27,7 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Authentication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,14 +47,16 @@
protected final String serviceUrl;
protected final String topicName;
+ private final Authentication authentication;
protected SerializationSchema<T> serializationSchema;
- protected BasePulsarOutputFormat(final String serviceUrl, final String topicName) {
+ protected BasePulsarOutputFormat(final String serviceUrl, final String topicName, final Authentication authentication) {
Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl), "serviceUrl cannot be blank.");
Preconditions.checkArgument(StringUtils.isNotBlank(topicName), "topicName cannot be blank.");
this.serviceUrl = serviceUrl;
this.topicName = topicName;
+ this.authentication = authentication;
LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic: {}", this.topicName);
}
@@ -65,7 +68,7 @@
@Override
public void open(int taskNumber, int numTasks) throws IOException {
- this.producer = getProducerInstance(serviceUrl, topicName);
+ this.producer = getProducerInstance(serviceUrl, topicName, authentication);
this.failureCallback = cause -> {
LOG.error("Error while sending record to Pulsar: " + cause.getMessage(), cause);
@@ -85,11 +88,12 @@
}
- private static Producer<byte[]> getProducerInstance(String serviceUrl, String topicName) throws PulsarClientException {
+ private static Producer<byte[]> getProducerInstance(String serviceUrl, String topicName, Authentication authentication)
+ throws PulsarClientException {
if(producer == null){
synchronized (PulsarOutputFormat.class) {
if(producer == null){
- producer = Preconditions.checkNotNull(createPulsarProducer(serviceUrl, topicName),
+ producer = Preconditions.checkNotNull(createPulsarProducer(serviceUrl, topicName, authentication),
"Pulsar producer cannot be null.");
}
}
@@ -97,9 +101,10 @@
return producer;
}
- private static Producer<byte[]> createPulsarProducer(String serviceUrl, String topicName) throws PulsarClientException {
+ private static Producer<byte[]> createPulsarProducer(String serviceUrl, String topicName, Authentication authentication)
+ throws PulsarClientException {
try {
- PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
+ PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
return client.newProducer().topic(topicName).create();
} catch (PulsarClientException e) {
LOG.error("Pulsar producer cannot be created.", e);
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
index d15dfe7..52484ef 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormat.java
@@ -20,6 +20,7 @@
import org.apache.avro.specific.SpecificRecord;
import org.apache.flink.batch.connectors.pulsar.serialization.AvroSerializationSchema;
+import org.apache.pulsar.client.api.Authentication;
/**
* Pulsar Avro Output Format to write Flink DataSets into a Pulsar topic in Avro format.
@@ -28,8 +29,8 @@
private static final long serialVersionUID = -6794070714728773530L;
- public PulsarAvroOutputFormat(String serviceUrl, String topicName) {
- super(serviceUrl, topicName);
+ public PulsarAvroOutputFormat(String serviceUrl, String topicName, Authentication authentication) {
+ super(serviceUrl, topicName, authentication);
this.serializationSchema = new AvroSerializationSchema();
}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
index adae9f7..d36a260 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormat.java
@@ -20,6 +20,7 @@
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.batch.connectors.pulsar.serialization.CsvSerializationSchema;
+import org.apache.pulsar.client.api.Authentication;
/**
* Pulsar Csv Output Format to write Flink DataSets into a Pulsar topic in Csv format.
@@ -28,8 +29,8 @@
private static final long serialVersionUID = -4461671510903404196L;
- public PulsarCsvOutputFormat(String serviceUrl, String topicName) {
- super(serviceUrl, topicName);
+ public PulsarCsvOutputFormat(String serviceUrl, String topicName, Authentication authentication) {
+ super(serviceUrl, topicName, authentication);
this.serializationSchema = new CsvSerializationSchema<>();
}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
index 3fe5baa..96d7a01 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormat.java
@@ -19,6 +19,7 @@
package org.apache.flink.batch.connectors.pulsar;
import org.apache.flink.batch.connectors.pulsar.serialization.JsonSerializationSchema;
+import org.apache.pulsar.client.api.Authentication;
/**
* Pulsar Json Output Format to write Flink DataSets into a Pulsar topic in Json format.
@@ -27,8 +28,8 @@
private static final long serialVersionUID = 8499620770848461958L;
- public PulsarJsonOutputFormat(String serviceUrl, String topicName) {
- super(serviceUrl, topicName);
+ public PulsarJsonOutputFormat(String serviceUrl, String topicName, Authentication authentication) {
+ super(serviceUrl, topicName, authentication);
this.serializationSchema = new JsonSerializationSchema();
}
}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
index 889970f..393faaf 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
@@ -20,6 +20,7 @@
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.util.Preconditions;
+import org.apache.pulsar.client.api.Authentication;
/**
* Pulsar Output Format to write Flink DataSets into a Pulsar topic in user-defined format.
@@ -28,8 +29,8 @@
private static final long serialVersionUID = 2997027580167793000L;
- public PulsarOutputFormat(String serviceUrl, String topicName, final SerializationSchema<T> serializationSchema) {
- super(serviceUrl, topicName);
+ public PulsarOutputFormat(String serviceUrl, String topicName, Authentication authentication, final SerializationSchema<T> serializationSchema) {
+ super(serviceUrl, topicName, authentication);
Preconditions.checkNotNull(serializationSchema, "serializationSchema cannot be null.");
this.serializationSchema = serializationSchema;
}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index c0d3905..55eb619 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -41,6 +41,7 @@
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,6 +66,12 @@
protected final String defaultTopicName;
/**
+ * Pulsar client will use this authentication information, if required.
+ */
+ private final Authentication authentication;
+
+
+ /**
* (Serializable) SerializationSchema for turning objects used with Flink into.
* byte[] for Pulsar.
*/
@@ -121,13 +128,15 @@
public FlinkPulsarProducer(String serviceUrl,
String defaultTopicName,
+ Authentication authentication,
SerializationSchema<IN> serializationSchema,
PulsarKeyExtractor<IN> keyExtractor) {
- this(serviceUrl, defaultTopicName, serializationSchema, keyExtractor, null);
+ this(serviceUrl, defaultTopicName, authentication, serializationSchema, keyExtractor, null);
}
public FlinkPulsarProducer(String serviceUrl,
String defaultTopicName,
+ Authentication authentication,
SerializationSchema<IN> serializationSchema,
PulsarKeyExtractor<IN> keyExtractor,
Map<String, Object> producerConfig) {
@@ -135,6 +144,7 @@
checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank");
this.serviceUrl = serviceUrl;
this.defaultTopicName = defaultTopicName;
+ this.authentication = authentication;
this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
ClosureCleaner.ensureSerializable(serializationSchema);
@@ -190,7 +200,7 @@
}
private Producer<byte[]> createProducer() throws Exception {
- PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
+ PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
ProducerBuilder<byte[]> producerBuilder = client.newProducer();
if (producerConfig != null) {
producerBuilder = producerBuilder.loadConf(producerConfig);
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
index b370345..20999fd 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSink.java
@@ -36,6 +36,7 @@
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.Authentication;
/**
* An append-only table sink to emit a streaming table as a Pulsar stream that serializes data in Avro format.
@@ -44,6 +45,7 @@
protected final String serviceUrl;
protected final String topic;
+ protected final Authentication authentication;
protected final String routingKeyFieldName;
protected SerializationSchema<Row> serializationSchema;
protected String[] fieldNames;
@@ -56,16 +58,17 @@
*
* @param serviceUrl pulsar service url
* @param topic topic in pulsar to which table is written
- * @param producerConf producer configuration
* @param routingKeyFieldName routing key field name
*/
public PulsarAvroTableSink(
String serviceUrl,
String topic,
+ Authentication authentication,
String routingKeyFieldName,
Class<? extends SpecificRecord> recordClazz) {
this.serviceUrl = checkNotNull(serviceUrl, "Service url not set");
this.topic = checkNotNull(topic, "Topic is null");
+ this.authentication = checkNotNull(authentication, "authentication is null, set new AuthenticationDisabled() instead");
this.routingKeyFieldName = routingKeyFieldName;
this.recordClazz = recordClazz;
}
@@ -78,6 +81,7 @@
return new FlinkPulsarProducer<Row>(
serviceUrl,
topic,
+ authentication,
serializationSchema,
keyExtractor);
}
@@ -110,7 +114,7 @@
@Override
public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
- PulsarAvroTableSink sink = new PulsarAvroTableSink(serviceUrl, topic, routingKeyFieldName, recordClazz);
+ PulsarAvroTableSink sink = new PulsarAvroTableSink(serviceUrl, topic, authentication, routingKeyFieldName, recordClazz);
sink.fieldNames = checkNotNull(fieldNames, "Field names are null");
sink.fieldTypes = checkNotNull(fieldTypes, "Field types are null");
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index 6479bf0..5af82bc 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -33,6 +33,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.Authentication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +59,7 @@
private final int messageReceiveTimeoutMs = 100;
private final String serviceUrl;
private final Set<String> topicNames;
+ private final Authentication authentication;
private final Pattern topicsPattern;
private final String subscriptionName;
private final DeserializationSchema<T> deserializer;
@@ -75,6 +77,7 @@
PulsarConsumerSource(PulsarSourceBuilder<T> builder) {
super(MessageId.class);
this.serviceUrl = builder.serviceUrl;
+ this.authentication = builder.authentication;
this.topicNames = builder.topicNames;
this.topicsPattern = builder.topicsPattern;
this.deserializer = builder.deserializationSchema;
@@ -191,6 +194,7 @@
PulsarClient createClient() throws PulsarClientException {
return PulsarClient.builder()
.serviceUrl(serviceUrl)
+ .authentication(authentication)
.build();
}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
index 45c2642..c37250d 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSink.java
@@ -22,6 +22,7 @@
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.Authentication;
/**
* Base class for {@link PulsarTableSink} that serializes data in JSON format.
@@ -33,14 +34,15 @@
*
* @param serviceUrl pulsar service url
* @param topic topic in pulsar to which table is written
- * @param producerConf producer configuration
+ * @param authentication authetication info required by pulsar client
* @param routingKeyFieldName routing key field name
*/
public PulsarJsonTableSink(
String serviceUrl,
String topic,
+ Authentication authentication,
String routingKeyFieldName) {
- super(serviceUrl, topic, routingKeyFieldName);
+ super(serviceUrl, topic, authentication, routingKeyFieldName);
}
@Override
@@ -53,6 +55,7 @@
return new PulsarJsonTableSink(
serviceUrl,
topic,
+ authentication,
routingKeyFieldName);
}
}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
index 3f30390..3b78495 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
@@ -23,12 +23,16 @@
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.PulsarClientException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.regex.Pattern;
+import java.util.Map;
/**
* A class for building a pulsar source.
@@ -43,6 +47,7 @@
final DeserializationSchema<T> deserializationSchema;
String serviceUrl = SERVICE_URL;
final Set<String> topicNames = new TreeSet<>();
+ Authentication authentication;
Pattern topicsPattern;
String subscriptionName = "flink-sub";
long acknowledgementBatchSize = ACKNOWLEDGEMENT_BATCH_SIZE;
@@ -163,6 +168,62 @@
throw new IllegalArgumentException("acknowledgementBatchSize can only take values > 0 and <= " + MAX_ACKNOWLEDGEMENT_BATCH_SIZE);
}
+ /**
+ * Set the authentication provider to use in the Pulsar client instance.
+ *
+ * @param authentication an instance of the {@link Authentication} provider already constructed
+ * @return this builder
+ */
+ public PulsarSourceBuilder<T> authentication(Authentication authentication) {
+ Preconditions.checkArgument(authentication != null,
+ "authentication instance can not be null, use new AuthenticationDisabled() to disable authentication");
+ this.authentication = authentication;
+ return this;
+ }
+
+ /**
+ * Configure the authentication provider to use in the Pulsar client instance
+ *
+ * @param authPluginClassName
+ * name of the Authentication-Plugin to use
+ * @param authParamsString
+ * string which represents parameters for the Authentication-Plugin, e.g., "key1:val1,key2:val2"
+ * @return this builder
+ * @throws PulsarClientException.UnsupportedAuthenticationException
+ * failed to instantiate specified Authentication-Plugin
+ */
+ public PulsarSourceBuilder<T> authentication(String authPluginClassName, String authParamsString)
+ throws PulsarClientException.UnsupportedAuthenticationException {
+ Preconditions.checkArgument(StringUtils.isNotBlank(authPluginClassName),
+ "Authentication-Plugin class name can not be blank");
+ Preconditions.checkArgument(StringUtils.isNotBlank(authParamsString),
+ "Authentication-Plugin parameters can not be blank");
+ this.authentication = AuthenticationFactory.create(authPluginClassName, authParamsString);
+ return this;
+ }
+
+ /**
+ * Configure the authentication provider to use in the Pulsar client instance
+ * using a config map.
+ *
+ * @param authPluginClassName
+ * name of the Authentication-Plugin you want to use
+ * @param authParams
+ * map which represents parameters for the Authentication-Plugin
+ * @return this builder
+ * @throws PulsarClientException.UnsupportedAuthenticationException
+ * failed to instantiate specified Authentication-Plugin
+ */
+ public PulsarSourceBuilder<T> authentication(String authPluginClassName, Map<String, String> authParams)
+ throws PulsarClientException.UnsupportedAuthenticationException {
+ Preconditions.checkArgument(StringUtils.isNotBlank(authPluginClassName),
+ "Authentication-Plugin class name can not be blank");
+ Preconditions.checkArgument((authParams != null && authParams.isEmpty() == false),
+ "parameters to authentication plugin can not be null/empty");
+ this.authentication = AuthenticationFactory.create(authPluginClassName, authParams);
+ return this;
+ }
+
public SourceFunction<T> build() {
Preconditions.checkNotNull(serviceUrl, "a service url is required");
Preconditions.checkArgument((topicNames != null && !topicNames.isEmpty()) || topicsPattern != null,
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
index 0fc45f7..5d20a1d 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarTableSink.java
@@ -33,6 +33,7 @@
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.Authentication;
/**
* An append-only table sink to emit a streaming table as a Pulsar stream.
@@ -41,6 +42,7 @@
protected final String serviceUrl;
protected final String topic;
+ protected Authentication authentication;
protected SerializationSchema<Row> serializationSchema;
protected PulsarKeyExtractor<Row> keyExtractor;
protected String[] fieldNames;
@@ -50,9 +52,11 @@
public PulsarTableSink(
String serviceUrl,
String topic,
+ Authentication authentication,
String routingKeyFieldName) {
this.serviceUrl = checkNotNull(serviceUrl, "Service url not set");
this.topic = checkNotNull(topic, "Topic is null");
+ this.authentication = checkNotNull(authentication, "authentication is null, set new AuthenticationDisabled() instead");
this.routingKeyFieldName = routingKeyFieldName;
}
@@ -78,6 +82,7 @@
return new FlinkPulsarProducer<Row>(
serviceUrl,
topic,
+ authentication,
serializationSchema,
keyExtractor);
}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
index 62c3b5d..bedcbda 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarAvroOutputFormatTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.flink.batch.connectors.pulsar;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.testng.annotations.Test;
import static org.testng.Assert.assertNotNull;
@@ -30,28 +31,28 @@
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarAvroOutputFormatConstructorWhenServiceUrlIsNull() {
- new PulsarAvroOutputFormat(null, "testTopic");
+ new PulsarAvroOutputFormat(null, "testTopic", new AuthenticationDisabled());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarAvroOutputFormatConstructorWhenTopicNameIsNull() {
- new PulsarAvroOutputFormat("testServiceUrl", null);
+ new PulsarAvroOutputFormat("testServiceUrl", null, new AuthenticationDisabled());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarAvroOutputFormatConstructorWhenTopicNameIsBlank() {
- new PulsarAvroOutputFormat("testServiceUrl", " ");
+ new PulsarAvroOutputFormat("testServiceUrl", " ", new AuthenticationDisabled());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarAvroOutputFormatConstructorWhenServiceUrlIsBlank() {
- new PulsarAvroOutputFormat(" ", "testTopic");
+ new PulsarAvroOutputFormat(" ", "testTopic", new AuthenticationDisabled());
}
@Test
public void testPulsarAvroOutputFormatConstructor() {
PulsarAvroOutputFormat pulsarAvroOutputFormat =
- new PulsarAvroOutputFormat("testServiceUrl", "testTopic");
+ new PulsarAvroOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled());
assertNotNull(pulsarAvroOutputFormat);
}
}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
index a564a89..caccb6b 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarCsvOutputFormatTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.flink.batch.connectors.pulsar;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.testng.annotations.Test;
import static org.testng.Assert.assertNotNull;
@@ -29,28 +30,28 @@
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarCsvOutputFormatConstructorWhenServiceUrlIsNull() {
- new PulsarCsvOutputFormat(null, "testTopic");
+ new PulsarCsvOutputFormat(null, "testTopic", new AuthenticationDisabled());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarCsvOutputFormatConstructorWhenTopicNameIsNull() {
- new PulsarCsvOutputFormat("testServiceUrl", null);
+ new PulsarCsvOutputFormat("testServiceUrl", null, new AuthenticationDisabled());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarCsvOutputFormatConstructorWhenTopicNameIsBlank() {
- new PulsarCsvOutputFormat("testServiceUrl", " ");
+ new PulsarCsvOutputFormat("testServiceUrl", " ", new AuthenticationDisabled());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarCsvOutputFormatConstructorWhenServiceUrlIsBlank() {
- new PulsarCsvOutputFormat(" ", "testTopic");
+ new PulsarCsvOutputFormat(" ", "testTopic", new AuthenticationDisabled());
}
@Test
public void testPulsarCsvOutputFormatConstructor() {
PulsarCsvOutputFormat pulsarCsvOutputFormat =
- new PulsarCsvOutputFormat("testServiceUrl", "testTopic");
+ new PulsarCsvOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled());
assertNotNull(pulsarCsvOutputFormat);
}
}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
index b9953cf..4ab7232 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarJsonOutputFormatTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.flink.batch.connectors.pulsar;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.testng.annotations.Test;
import static org.testng.Assert.assertNotNull;
@@ -29,28 +30,28 @@
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarJsonOutputFormatConstructorWhenServiceUrlIsNull() {
- new PulsarJsonOutputFormat(null, "testTopic");
+ new PulsarJsonOutputFormat(null, "testTopic", new AuthenticationDisabled());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarJsonOutputFormatConstructorWhenTopicNameIsNull() {
- new PulsarJsonOutputFormat("testServiceUrl", null);
+ new PulsarJsonOutputFormat("testServiceUrl", null, new AuthenticationDisabled());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarJsonOutputFormatConstructorWhenTopicNameIsBlank() {
- new PulsarJsonOutputFormat("testServiceUrl", " ");
+ new PulsarJsonOutputFormat("testServiceUrl", " ", new AuthenticationDisabled());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarJsonOutputFormatConstructorWhenServiceUrlIsBlank() {
- new PulsarJsonOutputFormat(" ", "testTopic");
+ new PulsarJsonOutputFormat(" ", "testTopic", new AuthenticationDisabled());
}
@Test
public void testPulsarJsonOutputFormatConstructor() {
PulsarJsonOutputFormat pulsarJsonOutputFormat =
- new PulsarJsonOutputFormat("testServiceUrl", "testTopic");
+ new PulsarJsonOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled());
assertNotNull(pulsarJsonOutputFormat);
}
}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
index 41cf8b2..238c49b 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
@@ -20,6 +20,7 @@
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.testng.annotations.Test;
import java.io.IOException;
@@ -35,34 +36,34 @@
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarOutputFormatConstructorWhenServiceUrlIsNull() {
- new PulsarOutputFormat(null, "testTopic", text -> text.toString().getBytes());
+ new PulsarOutputFormat(null, "testTopic", new AuthenticationDisabled(), text -> text.toString().getBytes());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarOutputFormatConstructorWhenTopicNameIsNull() {
- new PulsarOutputFormat("testServiceUrl", null, text -> text.toString().getBytes());
+ new PulsarOutputFormat("testServiceUrl", null, new AuthenticationDisabled(), text -> text.toString().getBytes());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarOutputFormatConstructorWhenTopicNameIsBlank() {
- new PulsarOutputFormat("testServiceUrl", " ", text -> text.toString().getBytes());
+ new PulsarOutputFormat("testServiceUrl", " ", new AuthenticationDisabled(), text -> text.toString().getBytes());
}
@Test(expectedExceptions = IllegalArgumentException.class)
public void testPulsarOutputFormatConstructorWhenServiceUrlIsBlank() {
- new PulsarOutputFormat(" ", "testTopic", text -> text.toString().getBytes());
+ new PulsarOutputFormat(" ", "testTopic", new AuthenticationDisabled(), text -> text.toString().getBytes());
}
@Test(expectedExceptions = NullPointerException.class)
public void testPulsarOutputFormatConstructorWhenSerializationSchemaIsNull() {
- new PulsarOutputFormat("testServiceUrl", "testTopic", null);
+ new PulsarOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled(), null);
}
@Test
public void testPulsarOutputFormatWithStringSerializationSchema() throws IOException {
String input = "Wolfgang Amadeus Mozart";
PulsarOutputFormat pulsarOutputFormat =
- new PulsarOutputFormat("testServiceUrl", "testTopic",
+ new PulsarOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled(),
text -> text.toString().getBytes());
assertNotNull(pulsarOutputFormat);
byte[] bytes = pulsarOutputFormat.serializationSchema.serialize(input);
@@ -74,7 +75,7 @@
public void testPulsarOutputFormatWithCustomSerializationSchema() throws IOException {
Employee employee = new Employee(1, "Test Employee", "Test Department");
PulsarOutputFormat pulsarOutputFormat =
- new PulsarOutputFormat("testServiceUrl", "testTopic",
+ new PulsarOutputFormat("testServiceUrl", "testTopic", new AuthenticationDisabled(),
new EmployeeSerializationSchema());
assertNotNull(pulsarOutputFormat);
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
index 85bb2ed..125ee4a 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarAvroTableSinkTest.java
@@ -25,6 +25,8 @@
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import org.powermock.api.mockito.PowerMockito;
@@ -39,6 +41,7 @@
public class PulsarAvroTableSinkTest {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "test_topic";
+ private static final Authentication AUTHENTICATION = new AuthenticationDisabled();
private static final String ROUTING_KEY = "name";
private final String[] fieldNames = {"id", "name","start_year","end_year"};
@@ -86,13 +89,14 @@
private PulsarAvroTableSink spySink() throws Exception {
- PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL, TOPIC_NAME, ROUTING_KEY, NasaMission.class);
+ PulsarAvroTableSink sink = new PulsarAvroTableSink(SERVICE_URL, TOPIC_NAME, AUTHENTICATION, ROUTING_KEY, NasaMission.class);
FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class);
PowerMockito.whenNew(
FlinkPulsarProducer.class
).withArguments(
Mockito.anyString(),
Mockito.anyString(),
+ Mockito.any(Authentication.class),
Mockito.any(SerializationSchema.class),
Mockito.any(PulsarKeyExtractor.class)
).thenReturn(producer);
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
index 9ceefff..c42ae6c 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarJsonTableSinkTest.java
@@ -24,6 +24,8 @@
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import org.powermock.api.mockito.PowerMockito;
@@ -39,6 +41,7 @@
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "test_topic";
+ private static final Authentication AUTHENTICATION = new AuthenticationDisabled();
private static final String ROUTING_KEY = "key";
private final String[] fieldNames = {"key", "value"};
private final TypeInformation[] typeInformations = {
@@ -80,13 +83,14 @@
}
private PulsarJsonTableSink spySink() throws Exception {
- PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL, TOPIC_NAME, ROUTING_KEY);
+ PulsarJsonTableSink sink = new PulsarJsonTableSink(SERVICE_URL, TOPIC_NAME, AUTHENTICATION, ROUTING_KEY);
FlinkPulsarProducer producer = Mockito.mock(FlinkPulsarProducer.class);
PowerMockito.whenNew(
FlinkPulsarProducer.class
).withArguments(
Mockito.anyString(),
Mockito.anyString(),
+ Mockito.any(Authentication.class),
Mockito.any(SerializationSchema.class),
Mockito.any(PulsarKeyExtractor.class)
).thenReturn(producer);