Add Flink - Pulsar Batch Sink Support (#2979)

### Motivation

This PR aims to bring Flink - Pulsar `Batch Sink` Support. If user works with Flink `DataSet` API and would like to write these `DataSets` to Pulsar, this sink can help.

*Ref:* [Flink Batch Sink API](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/#data-sinks)

### Modifications

Please find the change-set as follows:
- Defines `PulsarOutputFormat` to write Flink Batch `DataSets` into Pulsar.
- UT Coverage
- `FlinkPulsarBatchSinkExample` to show how to use and to be used by users.
- `README.md` documentation
- Minor `javadoc` fix
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
new file mode 100644
index 0000000..ac54248
--- /dev/null
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.function.Function;
+
+/**
+ * Flink Batch Sink to write DataSets into a Pulsar topic.
+ */
+public class PulsarOutputFormat<T> extends RichOutputFormat<T> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(PulsarOutputFormat.class);
+
+    private static String serviceUrl;
+    private static String topicName;
+    private SerializationSchema<T> serializationSchema;
+
+    private transient Function<Throwable, MessageId> failureCallback;
+
+    private static volatile Producer<byte[]> producer;
+
+    public PulsarOutputFormat(String serviceUrl, String topicName, SerializationSchema<T> serializationSchema) {
+        Preconditions.checkNotNull(serviceUrl, "serviceUrl must not be null.");
+        Preconditions.checkArgument(StringUtils.isNotBlank(topicName),  "topicName must not be blank.");
+        Preconditions.checkNotNull(serializationSchema,  "serializationSchema must not be null.");
+
+        this.serviceUrl = serviceUrl;
+        this.topicName = topicName;
+        this.serializationSchema = serializationSchema;
+
+        LOG.info("PulsarOutputFormat is being started to write batches to Pulsar topic {}", this.topicName);
+    }
+
+    @Override
+    public void configure(Configuration configuration) {
+
+    }
+
+    @Override
+    public void open(int taskNumber, int numTasks) throws IOException {
+        this.producer = getProducerInstance();
+
+        this.failureCallback = cause -> {
+            LOG.error("Error while sending record to Pulsar : " + cause.getMessage(), cause);
+            return null;
+        };
+    }
+
+    @Override
+    public void writeRecord(T t) throws IOException {
+        byte[] data = this.serializationSchema.serialize(t);
+        this.producer.sendAsync(data)
+                .exceptionally(this.failureCallback);
+    }
+
+    @Override
+    public void close() throws IOException {
+
+    }
+
+    private static Producer<byte[]> getProducerInstance() throws PulsarClientException {
+        if(producer == null){
+            synchronized (PulsarOutputFormat.class) {
+                if(producer == null){
+                    producer = Preconditions.checkNotNull(createPulsarProducer(),
+                                                                "Pulsar producer must not be null.");
+                }
+            }
+        }
+        return producer;
+    }
+
+    private static Producer<byte[]> createPulsarProducer() throws PulsarClientException {
+        try {
+            PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
+            return client.newProducer().topic(topicName).create();
+        } catch (PulsarClientException e) {
+            LOG.error("Pulsar producer can not be created.", e);
+            throw e;
+        }
+    }
+}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index 2324c55..48bc0f1 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -309,7 +309,7 @@
         if (e != null) {
             // prevent double throwing
             asyncException = null;
-            throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
+            throw new Exception("Failed to send data to Pulsar: " + e.getMessage(), e);
         }
     }
 
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
new file mode 100644
index 0000000..5639709
--- /dev/null
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * Tests for PulsarOutputFormat
+ */
+public class PulsarOutputFormatTest {
+
+    @Test(expected = NullPointerException.class)
+    public void testPulsarOutputFormatConstructorWhenServiceUrlIsNull() {
+        new PulsarOutputFormat(null, "testTopic", text -> text.toString().getBytes());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPulsarOutputFormatConstructorWhenTopicNameIsNull() {
+        new PulsarOutputFormat("testServiceUrl", null, text -> text.toString().getBytes());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testPulsarOutputFormatConstructorWhenTopicNameIsBlank() {
+        new PulsarOutputFormat("testServiceUrl", " ", text -> text.toString().getBytes());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testPulsarOutputFormatConstructorWhenSerializationSchemaIsNull() {
+        new PulsarOutputFormat("testServiceUrl", "testTopic", null);
+    }
+
+    @Test
+    public void testPulsarOutputFormatConstructor() {
+        PulsarOutputFormat pulsarOutputFormat =
+                new PulsarOutputFormat("testServiceUrl", "testTopic", text -> text.toString().getBytes());
+        assertNotNull(pulsarOutputFormat);
+    }
+
+}
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
new file mode 100644
index 0000000..7b35065
--- /dev/null
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.batch.connectors.pulsar.example;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.batch.connectors.pulsar.PulsarOutputFormat;
+import org.apache.flink.util.Collector;
+
+/**
+ * Implements a batch word-count program on Pulsar topic by writing Flink DataSet.
+ */
+public class FlinkPulsarBatchSinkExample {
+
+    private static final String EINSTEIN_QUOTE = "Imagination is more important than knowledge. " +
+            "Knowledge is limited. Imagination encircles the world.";
+
+    private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
+    private static final String TOPIC_NAME = "my-flink-topic";
+
+    public static void main(String[] args) throws Exception {
+
+        // set up the execution environment
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        // create PulsarOutputFormat instance
+        final OutputFormat pulsarOutputFormat =
+                new PulsarOutputFormat(SERVICE_URL, TOPIC_NAME, wordWithCount -> wordWithCount.toString().getBytes());
+
+        // create DataSet
+        DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
+
+        textDS.flatMap(new FlatMapFunction<String, WordWithCount>() {
+            @Override
+            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
+                String[] words = value.toLowerCase().split(" ");
+                for(String word: words) {
+                    out.collect(new WordWithCount(word.replace(".", ""), 1));
+                }
+            }
+        })
+        // filter words which length is bigger than 4
+        .filter(wordWithCount -> wordWithCount.word.length() > 4)
+        .groupBy(new KeySelector<WordWithCount, String>() {
+            @Override
+            public String getKey(WordWithCount wordWithCount) throws Exception {
+                return wordWithCount.word;
+            }
+        })
+        .reduce(new ReduceFunction<WordWithCount>() {
+            @Override
+            public WordWithCount reduce(WordWithCount wordWithCount1, WordWithCount wordWithCount2) throws Exception {
+                return  new WordWithCount(wordWithCount1.word, wordWithCount1.count + wordWithCount2.count);
+            }
+        })
+        // write batch data to Pulsar
+        .output(pulsarOutputFormat);
+
+        // execute program
+        env.execute("Flink - Pulsar Batch WordCount");
+
+    }
+
+    /**
+     * Data type for words with count.
+     */
+    private static class WordWithCount {
+
+        public String word;
+        public long count;
+
+        public WordWithCount(String word, long count) {
+            this.word = word;
+            this.count = count;
+        }
+
+        @Override
+        public String toString() {
+            return "WordWithCount { word = " + word + ", count = " + count + " }";
+        }
+    }
+}
\ No newline at end of file
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
new file mode 100644
index 0000000..f3baf00
--- /dev/null
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/README.md
@@ -0,0 +1,106 @@
+The Flink Batch Sink for Pulsar is a custom sink that enables Apache [Flink](https://flink.apache.org/) to write [DataSet](https://ci.apache.org/projects/flink/flink-docs-stable/dev/batch/index.html) to Pulsar.
+
+## Prerequisites
+
+To use this sink, include a dependency for the `pulsar-flink` library in your Java configuration.
+
+### Maven
+
+If you're using Maven, add this to your `pom.xml`:
+
+```xml
+<!-- in your <properties> block -->
+<pulsar.version>{{pulsar:version}}</pulsar.version>
+
+<!-- in your <dependencies> block -->
+<dependency>
+  <groupId>org.apache.pulsar</groupId>
+  <artifactId>pulsar-flink</artifactId>
+  <version>${pulsar.version}</version>
+</dependency>
+```
+
+### Gradle
+
+If you're using Gradle, add this to your `build.gradle` file:
+
+```groovy
+def pulsarVersion = "{{pulsar:version}}"
+
+dependencies {
+    compile group: 'org.apache.pulsar', name: 'pulsar-flink', version: pulsarVersion
+}
+```
+
+## Usage
+
+Please find a sample usage as follows:
+
+```java
+        private static final String EINSTEIN_QUOTE = "Imagination is more important than knowledge. " +
+                "Knowledge is limited. Imagination encircles the world.";
+
+        private static final String SERVICE_URL = "pulsar://127.0.0.1:6650";
+        private static final String TOPIC_NAME = "my-flink-topic";
+
+        public static void main(String[] args) throws Exception {
+
+            // set up the execution environment
+            final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+            // create PulsarOutputFormat instance
+            final OutputFormat<String> pulsarOutputFormat =
+                    new PulsarOutputFormat(SERVICE_URL, TOPIC_NAME, wordWithCount -> wordWithCount.toString().getBytes());
+
+            // create DataSet
+            DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
+
+            textDS.flatMap(new FlatMapFunction<String, String>() {
+                @Override
+                public void flatMap(String value, Collector<String> out) throws Exception {
+                    String[] words = value.toLowerCase().split(" ");
+                    for(String word: words) {
+                        out.collect(word.replace(".", ""));
+                    }
+                }
+            })
+            // filter words which length is bigger than 4
+            .filter(word -> word.length() > 4)
+
+            // write batch data to Pulsar
+            .output(pulsarOutputFormat);
+
+            // execute program
+            env.execute("Flink - Pulsar Batch WordCount");
+        }
+```
+
+## Sample Output
+
+Please find sample output for above application as follows:
+```
+imagination
+important
+knowledge
+knowledge
+limited
+imagination
+encircles
+world
+```
+
+## Complete Example
+
+You can find a complete example [here](https://github.com/apache/incubator-pulsar/tree/master/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/example/FlinkPulsarBatchSinkExample.java).
+In this example, Flink DataSet is processed as word-count and being written to Pulsar.
+
+## Complete Example Output
+Please find sample output for above linked application as follows:
+```
+WordWithCount { word = important, count = 1 }
+WordWithCount { word = encircles, count = 1 }
+WordWithCount { word = imagination, count = 2 }
+WordWithCount { word = knowledge, count = 2 }
+WordWithCount { word = limited, count = 1 }
+WordWithCount { word = world, count = 1 }
+```
\ No newline at end of file