blob: 83a7e42ed7a6d2985940262b167a19f518c90d62 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.spark;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.client.SiteToSiteClientConfig;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.receiver.Receiver;
/**
* <p>
* The <code>NiFiReceiver</code> is a Reliable Receiver that provides a way to
* pull data from Apache NiFi so that it can be processed by Spark Streaming.
* The NiFi Receiver connects to NiFi instance provided in the config and
* requests data from the OutputPort that is named. In NiFi, when an OutputPort
* is added to the root process group, it acts as a queue of data for remote
* clients. This receiver is then able to pull that data from NiFi reliably.
* </p>
*
* <p>
* It is important to note that if pulling data from a NiFi cluster, the URL
* that should be used is that of the NiFi Cluster Manager. The Receiver will
* automatically handle determining the nodes in that cluster and pull from
* those nodes as appropriate.
* </p>
*
* <p>
* In order to use the NiFiReceiver, you will need to first build a
* {@link SiteToSiteClientConfig} to provide to the constructor. This can be
* achieved by using the {@link SiteToSiteClient.Builder}. Below is an example
* snippet of driver code to pull data from NiFi that is running on
* localhost:8080. This example assumes that NiFi exposes and OutputPort on the
* root group named "Data For Spark". Additionally, it assumes that the data
* that it will receive from this OutputPort is text data, as it will map the
* byte array received from NiFi to a UTF-8 Encoded string.
* </p>
*
* <code>
* <pre>
* {@code
* Pattern SPACE = Pattern.compile(" ");
*
* // Build a Site-to-site client config
* SiteToSiteClientConfig config = new SiteToSiteClient.Builder()
* .setUrl("http://localhost:8080/nifi")
* .setPortName("Data For Spark")
* .buildConfig();
*
* SparkConf sparkConf = new SparkConf().setAppName("NiFi-Spark Streaming example");
* JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000L));
*
* // Create a JavaReceiverInputDStream using a NiFi receiver so that we can pull data from
* // specified Port
* JavaReceiverInputDStream<NiFiDataPacket> packetStream =
* ssc.receiverStream(new NiFiReceiver(clientConfig, StorageLevel.MEMORY_ONLY()));
*
* // Map the data from NiFi to text, ignoring the attributes
* JavaDStream<String> text = packetStream.map(new Function<NiFiDataPacket, String>() {
* public String call(final NiFiDataPacket dataPacket) throws Exception {
* return new String(dataPacket.getContent(), StandardCharsets.UTF_8);
* }
* });
*
* // Split the words by spaces
* JavaDStream<String> words = text.flatMap(new FlatMapFunction<String, String>() {
* public Iterable<String> call(final String text) throws Exception {
* return Arrays.asList(SPACE.split(text));
* }
* });
*
* // Map each word to the number 1, then aggregate by key
* JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
* new PairFunction<String, String, Integer>() {
* public Tuple2<String, Integer> call(String s) {
* return new Tuple2<String, Integer>(s, 1);
* }
* }).reduceByKey(new Function2<Integer, Integer, Integer>() {
* public Integer call(Integer i1, Integer i2) {
* return i1 + i2;
* }
* }
* );
*
* // print the results
* wordCounts.print();
* ssc.start();
* ssc.awaitTermination();
* }
* </pre>
* </code>
*/
public class NiFiReceiver extends Receiver<NiFiDataPacket> {
private static final long serialVersionUID = 3067274587595578836L;
private final SiteToSiteClientConfig clientConfig;
public NiFiReceiver(final SiteToSiteClientConfig clientConfig, final StorageLevel storageLevel) {
super(storageLevel);
this.clientConfig = clientConfig;
}
@Override
public void onStart() {
final Thread thread = new Thread(new ReceiveRunnable());
thread.setDaemon(true);
thread.setName("NiFi Receiver");
thread.start();
}
@Override
public void onStop() {
}
class ReceiveRunnable implements Runnable {
public ReceiveRunnable() {
}
@Override
public void run() {
try {
final SiteToSiteClient client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build();
try {
while (!isStopped()) {
final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE);
DataPacket dataPacket = transaction.receive();
if (dataPacket == null) {
transaction.confirm();
transaction.complete();
// no data available. Wait a bit and try again
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
}
continue;
}
final List<NiFiDataPacket> dataPackets = new ArrayList<>();
do {
// Read the data into a byte array and wrap it along with the attributes
// into a NiFiDataPacket.
final InputStream inStream = dataPacket.getData();
final byte[] data = new byte[(int) dataPacket.getSize()];
StreamUtils.fillBuffer(inStream, data);
final Map<String, String> attributes = dataPacket.getAttributes();
final NiFiDataPacket NiFiDataPacket = new StandardNiFiDataPacket(data, attributes);
dataPackets.add(NiFiDataPacket);
dataPacket = transaction.receive();
} while (dataPacket != null);
// Confirm transaction to verify the data
transaction.confirm();
store(dataPackets.iterator());
transaction.complete();
}
} finally {
try {
client.close();
} catch (final IOException ioe) {
reportError("Failed to close client", ioe);
}
}
} catch (final IOException ioe) {
restart("Failed to receive data from NiFi", ioe);
}
}
}
}