[pulsar-spark] upgrade SparkStreamingPulsarReceiver.java use pulsar-client and add spark example (#4143)
### Motivation
upgrade SparkStreamingPulsarReceiver.java use pulsar-client and add spark example
### Modifications
1. upgrade SparkStreamingPulsarReceiver.java use pulsar-client, remove pulsar-client-1x pom
2. add simple spark example
diff --git a/pulsar-spark/pom.xml b/pulsar-spark/pom.xml
index a903af4..08a0f75 100644
--- a/pulsar-spark/pom.xml
+++ b/pulsar-spark/pom.xml
@@ -37,8 +37,14 @@
<dependency>
<groupId>${project.groupId}</groupId>
- <artifactId>pulsar-client-1x</artifactId>
+ <artifactId>pulsar-client</artifactId>
<version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
@@ -49,6 +55,12 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>jackson-annotations</artifactId>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
diff --git a/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java b/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
index c761b2d..8e124ed 100644
--- a/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
+++ b/pulsar-spark/src/main/java/org/apache/pulsar/spark/SparkStreamingPulsarReceiver.java
@@ -19,76 +19,93 @@
package org.apache.pulsar.spark;
import java.io.Serializable;
-import java.util.concurrent.TimeUnit;
+import java.util.Set;
-import org.apache.pulsar.client.api.ClientConfiguration;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
public class SparkStreamingPulsarReceiver extends Receiver<byte[]> {
- private ClientConfiguration clientConfiguration;
- private ConsumerConfiguration consumerConfiguration;
- private PulsarClient pulsarClient;
- private String url;
- private String topic;
- private String subscription;
+ private static final Logger LOG = LoggerFactory.getLogger(SparkStreamingPulsarReceiver.class);
- public SparkStreamingPulsarReceiver(ClientConfiguration clientConfiguration,
- ConsumerConfiguration consumerConfiguration, String url, String topic, String subscription) {
- this(StorageLevel.MEMORY_AND_DISK_2(), clientConfiguration, consumerConfiguration, url, topic, subscription);
+ private String serviceUrl;
+ private ConsumerConfigurationData<byte[]> conf;
+ private Authentication authentication;
+ private PulsarClient pulsarClient;
+ private Consumer<byte[]> consumer;
+
+ public SparkStreamingPulsarReceiver(
+ String serviceUrl,
+ ConsumerConfigurationData<byte[]> conf,
+ Authentication authentication) {
+ this(StorageLevel.MEMORY_AND_DISK_2(), serviceUrl, conf, authentication);
}
- public SparkStreamingPulsarReceiver(StorageLevel storageLevel, ClientConfiguration clientConfiguration,
- ConsumerConfiguration consumerConfiguration, String url, String topic, String subscription) {
+ public SparkStreamingPulsarReceiver(StorageLevel storageLevel,
+ String serviceUrl,
+ ConsumerConfigurationData<byte[]> conf,
+ Authentication authentication) {
super(storageLevel);
- checkNotNull(clientConfiguration, "ClientConfiguration must not be null");
- checkNotNull(consumerConfiguration, "ConsumerConfiguration must not be null");
- this.clientConfiguration = clientConfiguration;
- this.url = url;
- this.topic = topic;
- this.subscription = subscription;
- if (consumerConfiguration.getAckTimeoutMillis() == 0) {
- consumerConfiguration.setAckTimeout(60, TimeUnit.SECONDS);
+
+ checkNotNull(serviceUrl, "serviceUrl must not be null");
+ checkNotNull(conf, "ConsumerConfigurationData must not be null");
+ checkArgument(conf.getTopicNames().size() > 0, "TopicNames must be set a value.");
+ checkNotNull(conf.getSubscriptionName(), "SubscriptionName must not be null");
+
+ this.serviceUrl = serviceUrl;
+ this.authentication = authentication;
+
+ if (conf.getAckTimeoutMillis() == 0) {
+ conf.setAckTimeoutMillis(60000);
}
- consumerConfiguration.setMessageListener((MessageListener & Serializable) (consumer, msg) -> {
- try {
- store(msg.getData());
- consumer.acknowledgeAsync(msg);
- } catch (Exception e) {
- log.error("Failed to store a message : {}", e.getMessage());
- }
- });
- this.consumerConfiguration = consumerConfiguration;
+ if (conf.getMessageListener() == null) {
+ conf.setMessageListener((MessageListener & Serializable) (consumer, msg) -> {
+ try {
+ store(msg.getData());
+ consumer.acknowledgeAsync(msg);
+ } catch (Exception e) {
+ LOG.error("Failed to store a message : {}", e.getMessage());
+ }
+ });
+ }
+ this.conf = conf;
}
public void onStart() {
try {
- pulsarClient = PulsarClient.create(url, clientConfiguration);
- pulsarClient.subscribe(topic, subscription, consumerConfiguration);
+ Set<String> topicNames = conf.getTopicNames();
+ String[] topicNamesArray = new String[topicNames.size()];
+ topicNames.toArray(topicNamesArray);
+ pulsarClient = PulsarClient.builder().serviceUrl(serviceUrl).authentication(authentication).build();
+ consumer = pulsarClient.newConsumer().topic(topicNamesArray).subscriptionName(conf.getSubscriptionName())
+ .messageListener(this.conf.getMessageListener()).subscribe();
} catch (PulsarClientException e) {
- log.error("Failed to start subscription : {}", e.getMessage());
+ LOG.error("Failed to start subscription : {}", e.getMessage());
restart("Restart a consumer");
}
}
public void onStop() {
try {
+ if (consumer != null) {
+ consumer.close();
+ }
if (pulsarClient != null) {
pulsarClient.close();
}
} catch (PulsarClientException e) {
- log.error("Failed to close client : {}", e.getMessage());
+ LOG.error("Failed to close client : {}", e.getMessage());
}
}
-
- private static final Logger log = LoggerFactory.getLogger(SparkStreamingPulsarReceiver.class);
}
\ No newline at end of file