Add Flink - Pulsar Batch Sink Support (#2979)

### Motivation

This PR aims to bring Flink - Pulsar `Batch Sink` Support. If user works with Flink `DataSet` API and would like to write these `DataSets` to Pulsar, this sink can help.

*Ref:* [Flink Batch Sink API](

### Modifications

Please find the change-set as follows:
- Defines `PulsarOutputFormat` to write Flink Batch `DataSets` into Pulsar.
- UT Coverage
- `FlinkPulsarBatchSinkExample` to show how to use and to be used by users.
- `` documentation
- Minor `javadoc` fix
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/
new file mode 100644
index 0000000..ac54248
--- /dev/null
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/
@@ -0,0 +1,111 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.function.Function;
+ * Flink Batch Sink to write DataSets into a Pulsar topic.
+ */
+public class PulsarOutputFormat<T> extends RichOutputFormat<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(PulsarOutputFormat.class);
+    private static String serviceUrl;
+    private static String topicName;
+    private SerializationSchema<T> serializationSchema;
+    private transient Function<Throwable, MessageId> failureCallback;
+    private static volatile Producer<byte[]> producer;
+    public PulsarOutputFormat(String serviceUrl, String topicName, SerializationSchema<T> serializationSchema) {
+        Preconditions.checkNotNull(serviceUrl, "serviceUrl must not be null.");
+        Preconditions.checkArgument(StringUtils.isNotBlank(topicName),  "topicName must not be blank.");
+        Preconditions.checkNotNull(serializationSchema,  "serializationSchema must not be null.");
+        this.serviceUrl = serviceUrl;
+        this.topicName = topicName;
+        this.serializationSchema = serializationSchema;
+"PulsarOutputFormat is being started to write batches to Pulsar topic {}", this.topicName);
+    }
+    @Override
+    public void configure(Configuration configuration) {
+    }
+    @Override
+    public void open(int taskNumber, int numTasks) throws IOException {
+        this.producer = getProducerInstance();
+        this.failureCallback = cause -> {
+            LOG.error("Error while sending record to Pulsar : " + cause.getMessage(), cause);
+            return null;
+        };
+    }
+    @Override
+    public void writeRecord(T t) throws IOException {
+        byte[] data = this.serializationSchema.serialize(t);
+        this.producer.sendAsync(data)
+                .exceptionally(this.failureCallback);
+    }
+    @Override
+    public void close() throws IOException {
+    }
+    private static Producer<byte[]> getProducerInstance() throws PulsarClientException {
+        if(producer == null){
+            synchronized (PulsarOutputFormat.class) {
+                if(producer == null){
+                    producer = Preconditions.checkNotNull(createPulsarProducer(),
+                                                                "Pulsar producer must not be null.");
+                }
+            }
+        }
+        return producer;
+    }
+    private static Producer<byte[]> createPulsarProducer() throws PulsarClientException {
+        try {
+            PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
+            return client.newProducer().topic(topicName).create();
+        } catch (PulsarClientException e) {
+            LOG.error("Pulsar producer can not be created.", e);
+            throw e;
+        }
+    }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/
index 2324c55..48bc0f1 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/
@@ -309,7 +309,7 @@
         if (e != null) {
             // prevent double throwing
             asyncException = null;
-            throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
+            throw new Exception("Failed to send data to Pulsar: " + e.getMessage(), e);
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/
new file mode 100644
index 0000000..5639709
--- /dev/null
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/
@@ -0,0 +1,57 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar;
+import org.junit.Test;
+import static org.junit.Assert.assertNotNull;
+ * Tests for PulsarOutputFormat
+ */
+public class PulsarOutputFormatTest {
+    @Test(expected = NullPointerException.class)
+    public void testPulsarOutputFormatConstructorWhenServiceUrlIsNull() {
+        new PulsarOutputFormat(null, "testTopic", text -> text.toString().getBytes());
+    }
+    @Test(expected = IllegalArgumentException.class)
+    public void testPulsarOutputFormatConstructorWhenTopicNameIsNull() {
+        new PulsarOutputFormat("testServiceUrl", null, text -> text.toString().getBytes());
+    }
+    @Test(expected = IllegalArgumentException.class)
+    public void testPulsarOutputFormatConstructorWhenTopicNameIsBlank() {
+        new PulsarOutputFormat("testServiceUrl", " ", text -> text.toString().getBytes());
+    }
+    @Test(expected = NullPointerException.class)
+    public void testPulsarOutputFormatConstructorWhenSerializationSchemaIsNull() {
+        new PulsarOutputFormat("testServiceUrl", "testTopic", null);
+    }
+    @Test
+    public void testPulsarOutputFormatConstructor() {
+        PulsarOutputFormat pulsarOutputFormat =
+                new PulsarOutputFormat("testServiceUrl", "testTopic", text -> text.toString().getBytes());
+        assertNotNull(pulsarOutputFormat);
+    }
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/
new file mode 100644
index 0000000..7b35065
--- /dev/null
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/
@@ -0,0 +1,102 @@
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.example;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat;
+import org.apache.flink.util.Collector;
+ * Implements a batch word-count program on Pulsar topic by writing Flink DataSet.
+ */
+public class FlinkPulsarBatchSinkExample {
+    private static final String EINSTEIN_QUOTE = "Imagination is more important than knowledge. " +
+            "Knowledge is limited. Imagination encircles the world.";
+    private static final String SERVICE_URL = "pulsar://";
+    private static final String TOPIC_NAME = "my-flink-topic";
+    public static void main(String[] args) throws Exception {
+        // set up the execution environment
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+        // create PulsarOutputFormat instance
+        final OutputFormat pulsarOutputFormat =
+                new PulsarOutputFormat(SERVICE_URL, TOPIC_NAME, wordWithCount -> wordWithCount.toString().getBytes());
+        // create DataSet
+        DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
+        textDS.flatMap(new FlatMapFunction<String, WordWithCount>() {
+            @Override
+            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
+                String[] words = value.toLowerCase().split(" ");
+                for(String word: words) {
+                    out.collect(new WordWithCount(word.replace(".", ""), 1));
+                }
+            }
+        })
+        // filter words which length is bigger than 4
+        .filter(wordWithCount -> wordWithCount.word.length() > 4)
+        .groupBy(new KeySelector<WordWithCount, String>() {
+            @Override
+            public String getKey(WordWithCount wordWithCount) throws Exception {
+                return wordWithCount.word;
+            }
+        })
+        .reduce(new ReduceFunction<WordWithCount>() {
+            @Override
+            public WordWithCount reduce(WordWithCount wordWithCount1, WordWithCount wordWithCount2) throws Exception {
+                return  new WordWithCount(wordWithCount1.word, wordWithCount1.count + wordWithCount2.count);
+            }
+        })
+        // write batch data to Pulsar
+        .output(pulsarOutputFormat);
+        // execute program
+        env.execute("Flink - Pulsar Batch WordCount");
+    }
+    /**
+     * Data type for words with count.
+     */
+    private static class WordWithCount {
+        public String word;
+        public long count;
+        public WordWithCount(String word, long count) {
+            this.word = word;
+            this.count = count;
+        }
+        @Override
+        public String toString() {
+            return "WordWithCount { word = " + word + ", count = " + count + " }";
+        }
+    }
\ No newline at end of file
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/
new file mode 100644
index 0000000..f3baf00
--- /dev/null
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/
@@ -0,0 +1,106 @@
+The Flink Batch Sink for Pulsar is a custom sink that enables Apache [Flink]( to write [DataSet]( to Pulsar.
+## Prerequisites
+To use this sink, include a dependency for the `pulsar-flink` library in your Java configuration.
+### Maven
+If you're using Maven, add this to your `pom.xml`:
+<!-- in your <properties> block -->
+<!-- in your <dependencies> block -->
+  <groupId>org.apache.pulsar</groupId>
+  <artifactId>pulsar-flink</artifactId>
+  <version>${pulsar.version}</version>
+### Gradle
+If you're using Gradle, add this to your `build.gradle` file:
+def pulsarVersion = "{{pulsar:version}}"
+dependencies {
+    compile group: 'org.apache.pulsar', name: 'pulsar-flink', version: pulsarVersion
+## Usage
+Please find a sample usage as follows:
+        private static final String EINSTEIN_QUOTE = "Imagination is more important than knowledge. " +
+                "Knowledge is limited. Imagination encircles the world.";
+        private static final String SERVICE_URL = "pulsar://";
+        private static final String TOPIC_NAME = "my-flink-topic";
+        public static void main(String[] args) throws Exception {
+            // set up the execution environment
+            final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+            // create PulsarOutputFormat instance
+            final OutputFormat<String> pulsarOutputFormat =
+                    new PulsarOutputFormat(SERVICE_URL, TOPIC_NAME, wordWithCount -> wordWithCount.toString().getBytes());
+            // create DataSet
+            DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
+            textDS.flatMap(new FlatMapFunction<String, String>() {
+                @Override
+                public void flatMap(String value, Collector<String> out) throws Exception {
+                    String[] words = value.toLowerCase().split(" ");
+                    for(String word: words) {
+                        out.collect(word.replace(".", ""));
+                    }
+                }
+            })
+            // filter words which length is bigger than 4
+            .filter(word -> word.length() > 4)
+            // write batch data to Pulsar
+            .output(pulsarOutputFormat);
+            // execute program
+            env.execute("Flink - Pulsar Batch WordCount");
+        }
+## Sample Output
+Please find sample output for above application as follows:
+## Complete Example
+You can find a complete example [here](
+In this example, Flink DataSet is processed as word-count and being written to Pulsar.
+## Complete Example Output
+Please find sample output for above linked application as follows:
+WordWithCount { word = important, count = 1 }
+WordWithCount { word = encircles, count = 1 }
+WordWithCount { word = imagination, count = 2 }
+WordWithCount { word = knowledge, count = 2 }
+WordWithCount { word = limited, count = 1 }
+WordWithCount { word = world, count = 1 }
\ No newline at end of file