blob: 5a809af2760c533c3759306a1fae00769bc2434e [file] [log] [blame] [view]
---
date: "2015-02-09T12:00:00Z"
title: Introducing Flink Streaming
aliases:
- /news/2015/02/09/streaming-example.html
---
This post is the first of a series of blog posts on Flink Streaming,
the recent addition to Apache Flink that makes it possible to analyze
continuous data sources in addition to static files. Flink Streaming
uses the pipelined Flink engine to process data streams in real time
and offers a new API including definition of flexible windows.
In this post, we go through an example that uses the Flink Streaming
API to compute statistics on stock market data that arrive
continuously and combine the stock market data with Twitter streams.
See the [Streaming Programming
Guide]({{< param DocsBaseUrl >}}flink-docs-master/apis/streaming/index.html) for a
detailed presentation of the Streaming API.
First, we read a bunch of stock price streams and combine them into
one stream of market data. We apply several transformations on this
market data stream, like rolling aggregations per stock. Then we emit
price warning alerts when the prices are rapidly changing. Moving
towards more advanced features, we compute rolling correlations
between the market data streams and a Twitter stream with stock mentions.
For running the example implementation please use the *0.9-SNAPSHOT*
version of Flink as a dependency. The full example code base can be
found [here](https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala) in Scala and [here](https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java) in Java7.
<a href="#top"></a>
[Back to top](#top)
Reading from multiple inputs
---------------
First, let us create the stream of stock prices:
1. Read a socket stream of stock prices
1. Parse the text in the stream to create a stream of `StockPrice` objects
1. Add four other sources tagged with the stock symbol.
1. Finally, merge the streams to create a unified stream.
<img alt="Reading from multiple inputs" src="/img/blog/blog_multi_input.png" width="70%" class="img-responsive center-block">
<div class="codetabs" markdown="1">
<div data-lang="scala" markdown="1">
{{< highlight scala >}}
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//Read from a socket stream at map it to StockPrice objects
val socketStockStream = env.socketTextStream("localhost", 9999).map(x => {
val split = x.split(",")
StockPrice(split(0), split(1).toDouble)
})
//Generate other stock streams
val SPX_Stream = env.addSource(generateStock("SPX")(10) _)
val FTSE_Stream = env.addSource(generateStock("FTSE")(20) _)
val DJI_Stream = env.addSource(generateStock("DJI")(30) _)
val BUX_Stream = env.addSource(generateStock("BUX")(40) _)
//Merge all stock streams together
val stockStream = socketStockStream.merge(SPX_Stream, FTSE_Stream,
DJI_Stream, BUX_Stream)
stockStream.print()
env.execute("Stock stream")
}
{{< / highlight >}}
</div>
<div data-lang="java7" markdown="1">
{{< highlight java >}}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//Read from a socket stream at map it to StockPrice objects
DataStream<StockPrice> socketStockStream = env
.socketTextStream("localhost", 9999)
.map(new MapFunction<String, StockPrice>() {
private String[] tokens;
@Override
public StockPrice map(String value) throws Exception {
tokens = value.split(",");
return new StockPrice(tokens[0],
Double.parseDouble(tokens[1]));
}
});
//Generate other stock streams
DataStream<StockPrice> SPX_stream = env.addSource(new StockSource("SPX", 10));
DataStream<StockPrice> FTSE_stream = env.addSource(new StockSource("FTSE", 20));
DataStream<StockPrice> DJI_stream = env.addSource(new StockSource("DJI", 30));
DataStream<StockPrice> BUX_stream = env.addSource(new StockSource("BUX", 40));
//Merge all stock streams together
DataStream<StockPrice> stockStream = socketStockStream
.merge(SPX_stream, FTSE_stream, DJI_stream, BUX_stream);
stockStream.print();
env.execute("Stock stream");
{{< / highlight >}}
</div>
</div>
See
[here]({{< param DocsBaseUrl >}}flink-docs-master/apis/streaming/index.html#data-sources)
on how you can create streaming sources for Flink Streaming
programs. Flink, of course, has support for reading in streams from
[external
sources]({{< param DocsBaseUrl >}}flink-docs-master/apis/streaming/connectors/index.html)
such as Apache Kafka, Apache Flume, RabbitMQ, and others. For the sake
of this example, the data streams are simply generated using the
`generateStock` method:
<div class="codetabs" markdown="1">
<div data-lang="scala" markdown="1">
{{< highlight scala >}}
val symbols = List("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", "GOOG")
case class StockPrice(symbol: String, price: Double)
def generateStock(symbol: String)(sigma: Int)(out: Collector[StockPrice]) = {
var price = 1000.
while (true) {
price = price + Random.nextGaussian * sigma
out.collect(StockPrice(symbol, price))
Thread.sleep(Random.nextInt(200))
}
}
{{< / highlight >}}
</div>
<div data-lang="java7" markdown="1">
{{< highlight java >}}
private static final ArrayList<String> SYMBOLS = new ArrayList<String>(
Arrays.asList("SPX", "FTSE", "DJI", "DJT", "BUX", "DAX", "GOOG"));
public static class StockPrice implements Serializable {
public String symbol;
public Double price;
public StockPrice() {
}
public StockPrice(String symbol, Double price) {
this.symbol = symbol;
this.price = price;
}
@Override
public String toString() {
return "StockPrice{" +
"symbol='" + symbol + '\'' +
", count=" + price +
'}';
}
}
public final static class StockSource implements SourceFunction<StockPrice> {
private Double price;
private String symbol;
private Integer sigma;
public StockSource(String symbol, Integer sigma) {
this.symbol = symbol;
this.sigma = sigma;
}
@Override
public void invoke(Collector<StockPrice> collector) throws Exception {
price = DEFAULT_PRICE;
Random random = new Random();
while (true) {
price = price + random.nextGaussian() * sigma;
collector.collect(new StockPrice(symbol, price));
Thread.sleep(random.nextInt(200));
}
}
}
{{< / highlight >}}
</div>
</div>
To read from the text socket stream please make sure that you have a
socket running. For the sake of the example executing the following
command in a terminal does the job. You can get
[netcat](http://netcat.sourceforge.net/) here if it is not available
on your machine.
```
nc -lk 9999
```
If we execute the program from our IDE we see the system the
stock prices being generated:
```
INFO Job execution switched to status RUNNING.
INFO Socket Stream(1/1) switched to SCHEDULED
INFO Socket Stream(1/1) switched to DEPLOYING
INFO Custom Source(1/1) switched to SCHEDULED
INFO Custom Source(1/1) switched to DEPLOYING
1> StockPrice{symbol='SPX', count=1011.3405732645239}
2> StockPrice{symbol='SPX', count=1018.3381290039248}
1> StockPrice{symbol='DJI', count=1036.7454894073978}
3> StockPrice{symbol='DJI', count=1135.1170217478427}
3> StockPrice{symbol='BUX', count=1053.667523187687}
4> StockPrice{symbol='BUX', count=1036.552601487263}
```
[Back to top](#top)
Window aggregations
---------------
We first compute aggregations on time-based windows of the
data. Flink provides [flexible windowing semantics]({{< param DocsBaseUrl >}}flink-docs-master/apis/streaming/windows.html) where windows can
also be defined based on count of records or any custom user defined
logic.
We partition our stream into windows of 10 seconds and slide the
window every 5 seconds. We compute three statistics every 5 seconds.
The first is the minimum price of all stocks, the second produces
maximum price per stock, and the third is the mean stock price
(using a map window function). Aggregations and groupings can be
performed on named fields of POJOs, making the code more readable.
<img alt="Basic windowing aggregations" src="/img/blog/blog_basic_window.png" width="70%" class="img-responsive center-block">
<div class="codetabs" markdown="1">
<div data-lang="scala" markdown="1">
{{< highlight scala >}}
//Define the desired time window
val windowedStream = stockStream
.window(Time.of(10, SECONDS)).every(Time.of(5, SECONDS))
//Compute some simple statistics on a rolling window
val lowest = windowedStream.minBy("price")
val maxByStock = windowedStream.groupBy("symbol").maxBy("price")
val rollingMean = windowedStream.groupBy("symbol").mapWindow(mean _)
//Compute the mean of a window
def mean(ts: Iterable[StockPrice], out: Collector[StockPrice]) = {
if (ts.nonEmpty) {
out.collect(StockPrice(ts.head.symbol, ts.foldLeft(0: Double)(_ + _.price) / ts.size))
}
}
{{< / highlight >}}
</div>
<div data-lang="java7" markdown="1">
{{< highlight java >}}
//Define the desired time window
WindowedDataStream<StockPrice> windowedStream = stockStream
.window(Time.of(10, TimeUnit.SECONDS))
.every(Time.of(5, TimeUnit.SECONDS));
//Compute some simple statistics on a rolling window
DataStream<StockPrice> lowest = windowedStream.minBy("price").flatten();
DataStream<StockPrice> maxByStock = windowedStream.groupBy("symbol")
.maxBy("price").flatten();
DataStream<StockPrice> rollingMean = windowedStream.groupBy("symbol")
.mapWindow(new WindowMean()).flatten();
//Compute the mean of a window
public final static class WindowMean implements
WindowMapFunction<StockPrice, StockPrice> {
private Double sum = 0.0;
private Integer count = 0;
private String symbol = "";
@Override
public void mapWindow(Iterable<StockPrice> values, Collector<StockPrice> out)
throws Exception {
if (values.iterator().hasNext()) {s
for (StockPrice sp : values) {
sum += sp.price;
symbol = sp.symbol;
count++;
}
out.collect(new StockPrice(symbol, sum / count));
}
}
}
{{< / highlight >}}
</div>
</div>
Let us note that to print a windowed stream one has to flatten it first,
thus getting rid of the windowing logic. For example execute
`maxByStock.flatten().print()` to print the stream of maximum prices of
the time windows by stock. For Scala `flatten()` is called implicitly
when needed.
[Back to top](#top)
Data-driven windows
---------------
The most interesting event in the stream is when the price of a stock
is changing rapidly. We can send a warning when a stock price changes
more than 5% since the last warning. To do that, we use a delta-based window providing a
threshold on when the computation will be triggered, a function to
compute the difference and a default value with which the first record
is compared. We also create a `Count` data type to count the warnings
every 30 seconds.
<img alt="Data-driven windowing semantics" src="/img/blog/blog_data_driven.png" width="100%" class="img-responsive center-block">
<div class="codetabs" markdown="1">
<div data-lang="scala" markdown="1">
{{< highlight scala >}}
case class Count(symbol: String, count: Int)
val defaultPrice = StockPrice("", 1000)
//Use delta policy to create price change warnings
val priceWarnings = stockStream.groupBy("symbol")
.window(Delta.of(0.05, priceChange, defaultPrice))
.mapWindow(sendWarning _)
//Count the number of warnings every half a minute
val warningsPerStock = priceWarnings.map(Count(_, 1))
.groupBy("symbol")
.window(Time.of(30, SECONDS))
.sum("count")
def priceChange(p1: StockPrice, p2: StockPrice): Double = {
Math.abs(p1.price / p2.price - 1)
}
def sendWarning(ts: Iterable[StockPrice], out: Collector[String]) = {
if (ts.nonEmpty) out.collect(ts.head.symbol)
}
{{< / highlight >}}
</div>
<div data-lang="java7" markdown="1">
{{< highlight java >}}
private static final Double DEFAULT_PRICE = 1000.;
private static final StockPrice DEFAULT_STOCK_PRICE = new StockPrice("", DEFAULT_PRICE);
//Use delta policy to create price change warnings
DataStream<String> priceWarnings = stockStream.groupBy("symbol")
.window(Delta.of(0.05, new DeltaFunction<StockPrice>() {
@Override
public double getDelta(StockPrice oldDataPoint, StockPrice newDataPoint) {
return Math.abs(oldDataPoint.price - newDataPoint.price);
}
}, DEFAULT_STOCK_PRICE))
.mapWindow(new SendWarning()).flatten();
//Count the number of warnings every half a minute
DataStream<Count> warningsPerStock = priceWarnings.map(new MapFunction<String, Count>() {
@Override
public Count map(String value) throws Exception {
return new Count(value, 1);
}
}).groupBy("symbol").window(Time.of(30, TimeUnit.SECONDS)).sum("count").flatten();
public static class Count implements Serializable {
public String symbol;
public Integer count;
public Count() {
}
public Count(String symbol, Integer count) {
this.symbol = symbol;
this.count = count;
}
@Override
public String toString() {
return "Count{" +
"symbol='" + symbol + '\'' +
", count=" + count +
'}';
}
}
public static final class SendWarning implements MapWindowFunction<StockPrice, String> {
@Override
public void mapWindow(Iterable<StockPrice> values, Collector<String> out)
throws Exception {
if (values.iterator().hasNext()) {
out.collect(values.iterator().next().symbol);
}
}
}
{{< / highlight >}}
</div>
</div>
[Back to top](#top)
Combining with a Twitter stream
---------------
Next, we will read a Twitter stream and correlate it with our stock
price stream. Flink has support for connecting to [Twitter's
API]({{< param DocsBaseUrl >}}flink-docs-master/apis/streaming/connectors/twitter.html)
but for the sake of this example we generate dummy tweet data.
<img alt="Social media analytics" src="/img/blog/blog_social_media.png" width="100%" class="img-responsive center-block">
<div class="codetabs" markdown="1">
<div data-lang="scala" markdown="1">
{{< highlight scala >}}
//Read a stream of tweets
val tweetStream = env.addSource(generateTweets _)
//Extract the stock symbols
val mentionedSymbols = tweetStream.flatMap(tweet => tweet.split(" "))
.map(_.toUpperCase())
.filter(symbols.contains(_))
//Count the extracted symbols
val tweetsPerStock = mentionedSymbols.map(Count(_, 1))
.groupBy("symbol")
.window(Time.of(30, SECONDS))
.sum("count")
def generateTweets(out: Collector[String]) = {
while (true) {
val s = for (i <- 1 to 3) yield (symbols(Random.nextInt(symbols.size)))
out.collect(s.mkString(" "))
Thread.sleep(Random.nextInt(500))
}
}
{{< / highlight >}}
</div>
<div data-lang="java7" markdown="1">
{{< highlight java >}}
//Read a stream of tweets
DataStream<String> tweetStream = env.addSource(new TweetSource());
//Extract the stock symbols
DataStream<String> mentionedSymbols = tweetStream.flatMap(
new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(word.toUpperCase());
}
}
}).filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
return SYMBOLS.contains(value);
}
});
//Count the extracted symbols
DataStream<Count> tweetsPerStock = mentionedSymbols.map(new MapFunction<String, Count>() {
@Override
public Count map(String value) throws Exception {
return new Count(value, 1);
}
}).groupBy("symbol").window(Time.of(30, TimeUnit.SECONDS)).sum("count").flatten();
public static final class TweetSource implements SourceFunction<String> {
Random random;
StringBuilder stringBuilder;
@Override
public void invoke(Collector<String> collector) throws Exception {
random = new Random();
stringBuilder = new StringBuilder();
while (true) {
stringBuilder.setLength(0);
for (int i = 0; i < 3; i++) {
stringBuilder.append(" ");
stringBuilder.append(SYMBOLS.get(random.nextInt(SYMBOLS.size())));
}
collector.collect(stringBuilder.toString());
Thread.sleep(500);
}
}
}
{{< / highlight >}}
</div>
</div>
[Back to top](#top)
Streaming joins
---------------
Finally, we join real-time tweets and stock prices and compute a
rolling correlation between the number of price warnings and the
number of mentions of a given stock in the Twitter stream. As both of
these data streams are potentially infinite, we apply the join on a
30-second window.
<img alt="Streaming joins" src="/img/blog/blog_stream_join.png" width="60%" class="img-responsive center-block">
<div class="codetabs" markdown="1">
<div data-lang="scala" markdown="1">
{{< highlight scala >}}
//Join warnings and parsed tweets
val tweetsAndWarning = warningsPerStock.join(tweetsPerStock)
.onWindow(30, SECONDS)
.where("symbol")
.equalTo("symbol") { (c1, c2) => (c1.count, c2.count) }
val rollingCorrelation = tweetsAndWarning.window(Time.of(30, SECONDS))
.mapWindow(computeCorrelation _)
rollingCorrelation print
//Compute rolling correlation
def computeCorrelation(input: Iterable[(Int, Int)], out: Collector[Double]) = {
if (input.nonEmpty) {
val var1 = input.map(_._1)
val mean1 = average(var1)
val var2 = input.map(_._2)
val mean2 = average(var2)
val cov = average(var1.zip(var2).map(xy => (xy._1 - mean1) * (xy._2 - mean2)))
val d1 = Math.sqrt(average(var1.map(x => Math.pow((x - mean1), 2))))
val d2 = Math.sqrt(average(var2.map(x => Math.pow((x - mean2), 2))))
out.collect(cov / (d1 * d2))
}
}
{{< / highlight >}}
</div>
<div data-lang="java7" markdown="1">
{{< highlight java >}}
//Join warnings and parsed tweets
DataStream<Tuple2<Integer, Integer>> tweetsAndWarning = warningsPerStock
.join(tweetsPerStock)
.onWindow(30, TimeUnit.SECONDS)
.where("symbol")
.equalTo("symbol")
.with(new JoinFunction<Count, Count, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> join(Count first, Count second) throws Exception {
return new Tuple2<Integer, Integer>(first.count, second.count);
}
});
//Compute rolling correlation
DataStream<Double> rollingCorrelation = tweetsAndWarning
.window(Time.of(30, TimeUnit.SECONDS))
.mapWindow(new WindowCorrelation());
rollingCorrelation.print();
public static final class WindowCorrelation
implements WindowMapFunction<Tuple2<Integer, Integer>, Double> {
private Integer leftSum;
private Integer rightSum;
private Integer count;
private Double leftMean;
private Double rightMean;
private Double cov;
private Double leftSd;
private Double rightSd;
@Override
public void mapWindow(Iterable<Tuple2<Integer, Integer>> values, Collector<Double> out)
throws Exception {
leftSum = 0;
rightSum = 0;
count = 0;
cov = 0.;
leftSd = 0.;
rightSd = 0.;
//compute mean for both sides, save count
for (Tuple2<Integer, Integer> pair : values) {
leftSum += pair.f0;
rightSum += pair.f1;
count++;
}
leftMean = leftSum.doubleValue() / count;
rightMean = rightSum.doubleValue() / count;
//compute covariance & std. deviations
for (Tuple2<Integer, Integer> pair : values) {
cov += (pair.f0 - leftMean) * (pair.f1 - rightMean) / count;
}
for (Tuple2<Integer, Integer> pair : values) {
leftSd += Math.pow(pair.f0 - leftMean, 2) / count;
rightSd += Math.pow(pair.f1 - rightMean, 2) / count;
}
leftSd = Math.sqrt(leftSd);
rightSd = Math.sqrt(rightSd);
out.collect(cov / (leftSd * rightSd));
}
}
{{< / highlight >}}
</div>
</div>
[Back to top](#top)
Other things to try
---------------
For a full feature overview please check the [Streaming Guide]({{< param DocsBaseUrl >}}flink-docs-master/apis/streaming/index.html), which describes all the available API features.
You are very welcome to try out our features for different use-cases we are looking forward to your experiences. Feel free to [contact us](http://flink.apache.org/community.html#mailing-lists).
Upcoming for streaming
---------------
There are some aspects of Flink Streaming that are subjects to
change by the next release making this application look even nicer.
Stay tuned for later blog posts on how Flink Streaming works
internally, fault tolerance, and performance measurements!
[Back to top](#top)