Fixed spark receiver to account for all the consumer config options (#5152)
diff --git a/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java b/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
index 8e124ed..833d42e 100644
--- a/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
+++ b/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
@@ -18,23 +18,23 @@
*/
package org.apache.pulsar.spark;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import java.io.Serializable;
-import java.util.Set;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
public class SparkStreamingPulsarReceiver extends Receiver<byte[]> {
private static final Logger LOG = LoggerFactory.getLogger(SparkStreamingPulsarReceiver.class);
@@ -66,16 +66,14 @@
this.serviceUrl = serviceUrl;
this.authentication = authentication;
- if (conf.getAckTimeoutMillis() == 0) {
- conf.setAckTimeoutMillis(60000);
- }
if (conf.getMessageListener() == null) {
- conf.setMessageListener((MessageListener & Serializable) (consumer, msg) -> {
+ conf.setMessageListener((MessageListener<byte[]> & Serializable) (consumer, msg) -> {
try {
store(msg.getData());
consumer.acknowledgeAsync(msg);
} catch (Exception e) {
LOG.error("Failed to store a message : {}", e.getMessage());
+ consumer.negativeAcknowledge(msg);
}
});
}
@@ -84,13 +82,9 @@
public void onStart() {
try {
- Set<String> topicNames = conf.getTopicNames();
- String[] topicNamesArray = new String[topicNames.size()];
- topicNames.toArray(topicNamesArray);
pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
- consumer = pulsarClient.newConsumer().topic(topicNamesArray).subscriptionName(conf.getSubscriptionName())
- .messageListener(this.conf.getMessageListener()).subscribe();
- } catch (PulsarClientException e) {
+ consumer = ((PulsarClientImpl) pulsarClient).subscribeAsync(conf).join();
+ } catch (Exception e) {
LOG.error("Failed to start subscription : {}", e.getMessage());
restart("Restart a consumer");
}