This page describes how to use the receiver to read Pulsar topics with Apache Spark stream processing applications.
This spark streaming job is consuming from a Pulsar topic and counting the wordcount in a streaming fashion. The job can write the word count results to stdout or another Pulsar topic.
If you choose local to run, modify PulsarSparkReceiverWordCount.java main code example :
public static void main(String[] args) throws InterruptedException { String serviceUrl = "pulsar://localhost:6650/"; String topic = "persistent://public/default/test_src"; String subs = "test_sub"; SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example"); JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(60)); ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData(); Set<String> set = new HashSet<>(); set.add(topic); pulsarConf.setTopicNames(set); pulsarConf.setSubscriptionName(subs); SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver( serviceUrl, pulsarConf, new AuthenticationDisabled()); JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver); JavaPairDStream<String, Integer> result = lineDStream.flatMap(x -> { String line = new String(x, Charset.forName("UTF-8")); List<String> list = Arrays.asList(line.split(" ")); return list.iterator(); }) .mapToPair(x -> new Tuple2<String, Integer>(x, 1)) .reduceByKey((x, y) -> x + y); result.print(); jsc.start(); jsc.awaitTermination(); }
If you choose spark_submit to run, the steps to run the example:
Start Pulsar Standalone.
You can follow the instructions to start a Pulsar standalone locally.
$ bin/pulsar standalone
Build the examples.
$ cd ${PULSAR_HOME} $ mvn clean install -DskipTests
Spark Run the word count example to print results to stdout.
$ ${SPARK_HOME}/bin/spark-submit --class org.apache.spark.streaming.receiver.example.SparkStreamingPulsarReceiverExample \ --master local[2] \ --packages org.apache.pulsar:pulsar-client:${project.version},org.apache.pulsar:pulsar-spark:${project.version} \ ${PULSAR_HOME}/examples/spark/target/pulsar-spark-examples.jar \ pulsar://localhost:6650 test_src test_sub
When you run pulsar Producer data like ProducerSparkReceiverData, You will see similar to print results to stdout, e.g.:
(streaming,100) (producer,100) (spark,100) (msg,100)