Fixed spark receiver to account for all the consumer config options (#5152)

diff --git a/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java b/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
index 8e124ed..833d42e 100644
--- a/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
+++ b/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
@@ -18,23 +18,23 @@
  */
 package org.apache.pulsar.spark;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.io.Serializable;
-import java.util.Set;
 
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.spark.storage.StorageLevel;
 import org.apache.spark.streaming.receiver.Receiver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
 public class SparkStreamingPulsarReceiver extends Receiver<byte[]> {
 
     private static final Logger LOG = LoggerFactory.getLogger(SparkStreamingPulsarReceiver.class);
@@ -66,16 +66,14 @@
         this.serviceUrl = serviceUrl;
         this.authentication = authentication;
 
-        if (conf.getAckTimeoutMillis() == 0) {
-            conf.setAckTimeoutMillis(60000);
-        }
         if (conf.getMessageListener() == null) {
-            conf.setMessageListener((MessageListener & Serializable) (consumer, msg) -> {
+            conf.setMessageListener((MessageListener<byte[]> & Serializable) (consumer, msg) -> {
                 try {
                     store(msg.getData());
                     consumer.acknowledgeAsync(msg);
                 } catch (Exception e) {
                     LOG.error("Failed to store a message : {}", e.getMessage());
+                    consumer.negativeAcknowledge(msg);
                 }
             });
         }
@@ -84,13 +82,9 @@
 
     public void onStart() {
         try {
-            Set<String> topicNames = conf.getTopicNames();
-            String[] topicNamesArray = new String[topicNames.size()];
-            topicNames.toArray(topicNamesArray);
             pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
-            consumer = pulsarClient.newConsumer().topic(topicNamesArray).subscriptionName(conf.getSubscriptionName())
-                .messageListener(this.conf.getMessageListener()).subscribe();
-        } catch (PulsarClientException e) {
+            consumer = ((PulsarClientImpl) pulsarClient).subscribeAsync(conf).join();
+        } catch (Exception e) {
             LOG.error("Failed to start subscription : {}", e.getMessage());
             restart("Restart a consumer");
         }