[pulsar-spark] upgrade SparkStreamingPulsarReceiver.java use pulsar-client and add spark example (#4143)

### Motivation

upgrade SparkStreamingPulsarReceiver.java use pulsar-client and add spark example

### Modifications

1. upgrade  SparkStreamingPulsarReceiver.java use pulsar-client, remove pulsar-client-1x pom
2. add  simple spark example
diff --git a/pulsar-spark/pom.xml b/pulsar-spark/pom.xml
index a903af4..08a0f75 100644
--- a/pulsar-spark/pom.xml
+++ b/pulsar-spark/pom.xml
@@ -37,8 +37,14 @@
 
     <dependency>
       <groupId>${project.groupId}</groupId>
-      <artifactId>pulsar-client-1x</artifactId>
+      <artifactId>pulsar-client</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>
@@ -49,6 +55,12 @@
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-streaming_2.10</artifactId>
+      <exclusions>
+        <exclusion>
+          <artifactId>jackson-annotations</artifactId>
+          <groupId>com.fasterxml.jackson.core</groupId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <dependency>
diff --git a/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java b/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
index c761b2d..8e124ed 100644
--- a/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
+++ b/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
@@ -19,76 +19,93 @@
 package org.apache.pulsar.spark;
 
 import java.io.Serializable;
-import java.util.concurrent.TimeUnit;
+import java.util.Set;
 
-import org.apache.pulsar.client.api.ClientConfiguration;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.receiver.Receiver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 public class SparkStreamingPulsarReceiver extends Receiver<byte[]> {
 
-    private ClientConfiguration clientConfiguration;
-    private ConsumerConfiguration consumerConfiguration;
-    private PulsarClient pulsarClient;
-    private String url;
-    private String topic;
-    private String subscription;
+    private static final Logger LOG = LoggerFactory.getLogger(SparkStreamingPulsarReceiver.class);
 
-    public SparkStreamingPulsarReceiver(ClientConfiguration clientConfiguration,
-            ConsumerConfiguration consumerConfiguration, String url, String topic, String subscription) {
-        this(StorageLevel.MEMORY_AND_DISK_2(), clientConfiguration, consumerConfiguration, url, topic, subscription);
+    private String serviceUrl;
+    private ConsumerConfigurationData<byte[]> conf;
+    private Authentication authentication;
+    private PulsarClient pulsarClient;
+    private Consumer<byte[]> consumer;
+
+    public SparkStreamingPulsarReceiver(
+        String serviceUrl,
+        ConsumerConfigurationData<byte[]> conf,
+        Authentication authentication) {
+        this(StorageLevel.MEMORY_AND_DISK_2(), serviceUrl, conf, authentication);
     }
 
-    public SparkStreamingPulsarReceiver(StorageLevel storageLevel, ClientConfiguration clientConfiguration,
-            ConsumerConfiguration consumerConfiguration, String url, String topic, String subscription) {
+    public SparkStreamingPulsarReceiver(StorageLevel storageLevel,
+        String serviceUrl,
+        ConsumerConfigurationData<byte[]> conf,
+        Authentication authentication) {
         super(storageLevel);
-        checkNotNull(clientConfiguration, "ClientConfiguration must not be null");
-        checkNotNull(consumerConfiguration, "ConsumerConfiguration must not be null");
-        this.clientConfiguration = clientConfiguration;
-        this.url = url;
-        this.topic = topic;
-        this.subscription = subscription;
-        if (consumerConfiguration.getAckTimeoutMillis() == 0) {
-            consumerConfiguration.setAckTimeout(60, TimeUnit.SECONDS);
+
+        checkNotNull(serviceUrl, "serviceUrl must not be null");
+        checkNotNull(conf, "ConsumerConfigurationData must not be null");
+        checkArgument(conf.getTopicNames().size() > 0, "TopicNames must be set a value.");
+        checkNotNull(conf.getSubscriptionName(), "SubscriptionName must not be null");
+
+        this.serviceUrl = serviceUrl;
+        this.authentication = authentication;
+
+        if (conf.getAckTimeoutMillis() == 0) {
+            conf.setAckTimeoutMillis(60000);
         }
-        consumerConfiguration.setMessageListener((MessageListener & Serializable) (consumer, msg) -> {
-            try {
-                store(msg.getData());
-                consumer.acknowledgeAsync(msg);
-            } catch (Exception e) {
-                log.error("Failed to store a message : {}", e.getMessage());
-            }
-        });
-        this.consumerConfiguration = consumerConfiguration;
+        if (conf.getMessageListener() == null) {
+            conf.setMessageListener((MessageListener & Serializable) (consumer, msg) -> {
+                try {
+                    store(msg.getData());
+                    consumer.acknowledgeAsync(msg);
+                } catch (Exception e) {
+                    LOG.error("Failed to store a message : {}", e.getMessage());
+                }
+            });
+        }
+        this.conf = conf;
     }
 
     public void onStart() {
         try {
-            pulsarClient = PulsarClient.create(url, clientConfiguration);
-            pulsarClient.subscribe(topic, subscription, consumerConfiguration);
+            Set<String> topicNames = conf.getTopicNames();
+            String[] topicNamesArray = new String[topicNames.size()];
+            topicNames.toArray(topicNamesArray);
+            pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
+            consumer = pulsarClient.newConsumer().topic(topicNamesArray).subscriptionName(conf.getSubscriptionName())
+                .messageListener(this.conf.getMessageListener()).subscribe();
         } catch (PulsarClientException e) {
-            log.error("Failed to start subscription : {}", e.getMessage());
+            LOG.error("Failed to start subscription : {}", e.getMessage());
             restart("Restart a consumer");
         }
     }
 
     public void onStop() {
         try {
+            if (consumer != null) {
+                consumer.close();
+            }
             if (pulsarClient != null) {
                 pulsarClient.close();
             }
         } catch (PulsarClientException e) {
-            log.error("Failed to close client : {}", e.getMessage());
+            LOG.error("Failed to close client : {}", e.getMessage());
         }
     }
-
-    private static final Logger log = LoggerFactory.getLogger(SparkStreamingPulsarReceiver.class);
 }
\ No newline at end of file