| |
| <!DOCTYPE html> |
| <html lang="en" dir=ZgotmplZ> |
| |
| <head> |
| |
| |
| |
| <link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css"> |
| <script src="/bootstrap/js/bootstrap.bundle.min.js"></script> |
| <link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css"> |
| <script src="/js/anchor.min.js"></script> |
| <script src="/js/flink.js"></script> |
| <link rel="canonical" href="https://flink.apache.org/2015/02/09/introducing-flink-streaming/"> |
| |
| <meta charset="UTF-8"> |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| <meta name="description" content="This post is the first of a series of blog posts on Flink Streaming, the recent addition to Apache Flink that makes it possible to analyze continuous data sources in addition to static files. Flink Streaming uses the pipelined Flink engine to process data streams in real time and offers a new API including definition of flexible windows. |
| In this post, we go through an example that uses the Flink Streaming API to compute statistics on stock market data that arrive continuously and combine the stock market data with Twitter streams."> |
| <meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Introducing Flink Streaming" /> |
| <meta property="og:description" content="This post is the first of a series of blog posts on Flink Streaming, the recent addition to Apache Flink that makes it possible to analyze continuous data sources in addition to static files. Flink Streaming uses the pipelined Flink engine to process data streams in real time and offers a new API including definition of flexible windows. |
| In this post, we go through an example that uses the Flink Streaming API to compute statistics on stock market data that arrive continuously and combine the stock market data with Twitter streams." /> |
| <meta property="og:type" content="article" /> |
| <meta property="og:url" content="https://flink.apache.org/2015/02/09/introducing-flink-streaming/" /><meta property="article:section" content="posts" /> |
| <meta property="article:published_time" content="2015-02-09T12:00:00+00:00" /> |
| <meta property="article:modified_time" content="2015-02-09T12:00:00+00:00" /> |
| <title>Introducing Flink Streaming | Apache Flink</title> |
| <link rel="manifest" href="/manifest.json"> |
| <link rel="icon" href="/favicon.png" type="image/x-icon"> |
| <link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU="> |
| <script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script> |
| <!-- |
| Made with Book Theme |
| https://github.com/alex-shpak/hugo-book |
| --> |
| |
| <meta name="generator" content="Hugo 0.124.1"> |
| |
| |
| <script> |
| var _paq = window._paq = window._paq || []; |
| |
| |
| _paq.push(['disableCookies']); |
| |
| _paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]); |
| _paq.push(['trackPageView']); |
| _paq.push(['enableLinkTracking']); |
| (function() { |
| var u="//analytics.apache.org/"; |
| _paq.push(['setTrackerUrl', u+'matomo.php']); |
| _paq.push(['setSiteId', '1']); |
| var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0]; |
| g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s); |
| })(); |
| </script> |
| |
| </head> |
| |
| <body dir=ZgotmplZ> |
| |
| |
| |
| <header> |
| <nav class="navbar navbar-expand-xl"> |
| <div class="container-fluid"> |
| <a class="navbar-brand" href="/"> |
| <img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle"> |
| <span>Apache Flink</span> |
| </a> |
| <button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation"> |
| <i class="fa fa-bars navbar-toggler-icon"></i> |
| </button> |
| <div class="collapse navbar-collapse" id="navbarSupportedContent"> |
| <ul class="navbar-nav"> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <li class="nav-item dropdown"> |
| <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a> |
| <ul class="dropdown-menu"> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/what-is-flink/security/">Security</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a> |
| |
| |
| </li> |
| |
| </ul> |
| </li> |
| |
| |
| |
| |
| |
| <li class="nav-item dropdown"> |
| <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a> |
| <ul class="dropdown-menu"> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| </ul> |
| </li> |
| |
| |
| |
| |
| |
| <li class="nav-item dropdown"> |
| <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a> |
| <ul class="dropdown-menu"> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i> |
| </a> |
| |
| |
| </li> |
| |
| </ul> |
| </li> |
| |
| |
| |
| |
| |
| <li class="nav-item dropdown"> |
| <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a> |
| <ul class="dropdown-menu"> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a> |
| |
| |
| </li> |
| |
| <li> |
| |
| |
| <a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a> |
| |
| |
| </li> |
| |
| </ul> |
| </li> |
| |
| |
| |
| |
| |
| |
| |
| |
| <li class="nav-item"> |
| |
| |
| <a class="nav-link" href="/posts/">Flink Blog</a> |
| |
| |
| </li> |
| |
| |
| |
| |
| |
| <li class="nav-item"> |
| |
| |
| <a class="nav-link" href="/downloads/">Downloads</a> |
| |
| |
| </li> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| </ul> |
| <div class="book-search"> |
| <div class="book-search-spinner hidden"> |
| <i class="fa fa-refresh fa-spin"></i> |
| </div> |
| <form class="search-bar d-flex" onsubmit="return false;"su> |
| <input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/"> |
| <i class="fa fa-search search"></i> |
| <i class="fa fa-circle-o-notch fa-spin spinner"></i> |
| </form> |
| <div class="book-search-spinner hidden"></div> |
| <ul id="book-search-results"></ul> |
| </div> |
| </div> |
| </div> |
| </nav> |
| <div class="navbar-clearfix"></div> |
| </header> |
| |
| |
| <main class="flex"> |
| <section class="container book-page"> |
| |
| <article class="markdown"> |
| <h1> |
| <a href="/2015/02/09/introducing-flink-streaming/">Introducing Flink Streaming</a> |
| </h1> |
| |
| |
| |
| February 9, 2015 - |
| |
| |
| |
| |
| |
| <p><p>This post is the first of a series of blog posts on Flink Streaming, |
| the recent addition to Apache Flink that makes it possible to analyze |
| continuous data sources in addition to static files. Flink Streaming |
| uses the pipelined Flink engine to process data streams in real time |
| and offers a new API including definition of flexible windows.</p> |
| <p>In this post, we go through an example that uses the Flink Streaming |
| API to compute statistics on stock market data that arrive |
| continuously and combine the stock market data with Twitter streams. |
| See the <a href="//nightlies.apache.org/flink/flink-docs-master/apis/streaming/index.html">Streaming Programming |
| Guide</a> for a |
| detailed presentation of the Streaming API.</p> |
| <p>First, we read a bunch of stock price streams and combine them into |
| one stream of market data. We apply several transformations on this |
| market data stream, like rolling aggregations per stock. Then we emit |
| price warning alerts when the prices are rapidly changing. Moving |
| towards more advanced features, we compute rolling correlations |
| between the market data streams and a Twitter stream with stock mentions.</p> |
| <p>For running the example implementation please use the <em>0.9-SNAPSHOT</em> |
| version of Flink as a dependency. The full example code base can be |
| found <a href="https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/windowing/StockPrices.scala">here</a> in Scala and <a href="https://github.com/mbalassi/flink/blob/stockprices/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java">here</a> in Java7.</p> |
| <p><a href="#top"></a></p> |
| <p><a href="#top">Back to top</a></p> |
| <h2 id="reading-from-multiple-inputs"> |
| Reading from multiple inputs |
| <a class="anchor" href="#reading-from-multiple-inputs">#</a> |
| </h2> |
| <p>First, let us create the stream of stock prices:</p> |
| <ol> |
| <li>Read a socket stream of stock prices</li> |
| <li>Parse the text in the stream to create a stream of <code>StockPrice</code> objects</li> |
| <li>Add four other sources tagged with the stock symbol.</li> |
| <li>Finally, merge the streams to create a unified stream.</li> |
| </ol> |
| <img alt="Reading from multiple inputs" src="/img/blog/blog_multi_input.png" width="70%" class="img-responsive center-block"> |
| <div class="codetabs" markdown="1"> |
| <div data-lang="scala" markdown="1"> |
| <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="k">def</span> <span class="n">main</span><span class="o">(</span><span class="n">args</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span> <span class="o">{</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"> <span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">StreamExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"> <span class="c1">//Read from a socket stream at map it to StockPrice objects |
| </span></span></span><span class="line"><span class="cl"><span class="c1"></span> <span class="k">val</span> <span class="n">socketStockStream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">socketTextStream</span><span class="o">(</span><span class="s">"localhost"</span><span class="o">,</span> <span class="mi">9999</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="n">x</span> <span class="k">=></span> <span class="o">{</span> |
| </span></span><span class="line"><span class="cl"> <span class="k">val</span> <span class="n">split</span> <span class="k">=</span> <span class="n">x</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">","</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> <span class="nc">StockPrice</span><span class="o">(</span><span class="n">split</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span> <span class="n">split</span><span class="o">(</span><span class="mi">1</span><span class="o">).</span><span class="n">toDouble</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> <span class="o">})</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"> <span class="c1">//Generate other stock streams |
| </span></span></span><span class="line"><span class="cl"><span class="c1"></span> <span class="k">val</span> <span class="nc">SPX_Stream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateStock</span><span class="o">(</span><span class="s">"SPX"</span><span class="o">)(</span><span class="mi">10</span><span class="o">)</span> <span class="k">_</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> <span class="k">val</span> <span class="nc">FTSE_Stream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateStock</span><span class="o">(</span><span class="s">"FTSE"</span><span class="o">)(</span><span class="mi">20</span><span class="o">)</span> <span class="k">_</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> <span class="k">val</span> <span class="nc">DJI_Stream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateStock</span><span class="o">(</span><span class="s">"DJI"</span><span class="o">)(</span><span class="mi">30</span><span class="o">)</span> <span class="k">_</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> <span class="k">val</span> <span class="nc">BUX_Stream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateStock</span><span class="o">(</span><span class="s">"BUX"</span><span class="o">)(</span><span class="mi">40</span><span class="o">)</span> <span class="k">_</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"> <span class="c1">//Merge all stock streams together |
| </span></span></span><span class="line"><span class="cl"><span class="c1"></span> <span class="k">val</span> <span class="n">stockStream</span> <span class="k">=</span> <span class="n">socketStockStream</span><span class="o">.</span><span class="n">merge</span><span class="o">(</span><span class="nc">SPX_Stream</span><span class="o">,</span> <span class="nc">FTSE_Stream</span><span class="o">,</span> |
| </span></span><span class="line"><span class="cl"> <span class="nc">DJI_Stream</span><span class="o">,</span> <span class="nc">BUX_Stream</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"> <span class="n">stockStream</span><span class="o">.</span><span class="n">print</span><span class="o">()</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"> <span class="n">env</span><span class="o">.</span><span class="n">execute</span><span class="o">(</span><span class="s">"Stock stream"</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"><span class="o">}</span></span></span></code></pre></div> |
| </div> |
| <div data-lang="java7" markdown="1"> |
| <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">main</span><span class="p">(</span><span class="n">String</span><span class="o">[]</span><span class="w"> </span><span class="n">args</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">StreamExecutionEnvironment</span><span class="w"> </span><span class="n">env</span><span class="w"> </span><span class="o">=</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">StreamExecutionEnvironment</span><span class="p">.</span><span class="na">getExecutionEnvironment</span><span class="p">();</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">//Read from a socket stream at map it to StockPrice objects</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span><span class="w"> </span><span class="n">socketStockStream</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">env</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">socketTextStream</span><span class="p">(</span><span class="s">"localhost"</span><span class="p">,</span><span class="w"> </span><span class="n">9999</span><span class="p">)</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">map</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">MapFunction</span><span class="o"><</span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">StockPrice</span><span class="o">></span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">String</span><span class="o">[]</span><span class="w"> </span><span class="n">tokens</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">StockPrice</span><span class="w"> </span><span class="nf">map</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">value</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">tokens</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">value</span><span class="p">.</span><span class="na">split</span><span class="p">(</span><span class="s">","</span><span class="p">);</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">StockPrice</span><span class="p">(</span><span class="n">tokens</span><span class="o">[</span><span class="n">0</span><span class="o">]</span><span class="p">,</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Double</span><span class="p">.</span><span class="na">parseDouble</span><span class="p">(</span><span class="n">tokens</span><span class="o">[</span><span class="n">1</span><span class="o">]</span><span class="p">));</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">});</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">//Generate other stock streams</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span><span class="w"> </span><span class="n">SPX_stream</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">env</span><span class="p">.</span><span class="na">addSource</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">StockSource</span><span class="p">(</span><span class="s">"SPX"</span><span class="p">,</span><span class="w"> </span><span class="n">10</span><span class="p">));</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span><span class="w"> </span><span class="n">FTSE_stream</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">env</span><span class="p">.</span><span class="na">addSource</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">StockSource</span><span class="p">(</span><span class="s">"FTSE"</span><span class="p">,</span><span class="w"> </span><span class="n">20</span><span class="p">));</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span><span class="w"> </span><span class="n">DJI_stream</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">env</span><span class="p">.</span><span class="na">addSource</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">StockSource</span><span class="p">(</span><span class="s">"DJI"</span><span class="p">,</span><span class="w"> </span><span class="n">30</span><span class="p">));</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span><span class="w"> </span><span class="n">BUX_stream</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">env</span><span class="p">.</span><span class="na">addSource</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">StockSource</span><span class="p">(</span><span class="s">"BUX"</span><span class="p">,</span><span class="w"> </span><span class="n">40</span><span class="p">));</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">//Merge all stock streams together</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span><span class="w"> </span><span class="n">stockStream</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">socketStockStream</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">merge</span><span class="p">(</span><span class="n">SPX_stream</span><span class="p">,</span><span class="w"> </span><span class="n">FTSE_stream</span><span class="p">,</span><span class="w"> </span><span class="n">DJI_stream</span><span class="p">,</span><span class="w"> </span><span class="n">BUX_stream</span><span class="p">);</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">stockStream</span><span class="p">.</span><span class="na">print</span><span class="p">();</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">env</span><span class="p">.</span><span class="na">execute</span><span class="p">(</span><span class="s">"Stock stream"</span><span class="p">);</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span></span></span></code></pre></div> |
| </div> |
| </div> |
| <p>See |
| <a href="//nightlies.apache.org/flink/flink-docs-master/apis/streaming/index.html#data-sources">here</a> |
| on how you can create streaming sources for Flink Streaming |
| programs. Flink, of course, has support for reading in streams from |
| <a href="//nightlies.apache.org/flink/flink-docs-master/apis/streaming/connectors/index.html">external |
| sources</a> |
| such as Apache Kafka, Apache Flume, RabbitMQ, and others. For the sake |
| of this example, the data streams are simply generated using the |
| <code>generateStock</code> method:</p> |
| <div class="codetabs" markdown="1"> |
| <div data-lang="scala" markdown="1"> |
| <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="k">val</span> <span class="n">symbols</span> <span class="k">=</span> <span class="nc">List</span><span class="o">(</span><span class="s">"SPX"</span><span class="o">,</span> <span class="s">"FTSE"</span><span class="o">,</span> <span class="s">"DJI"</span><span class="o">,</span> <span class="s">"DJT"</span><span class="o">,</span> <span class="s">"BUX"</span><span class="o">,</span> <span class="s">"DAX"</span><span class="o">,</span> <span class="s">"GOOG"</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"><span class="k">case</span> <span class="k">class</span> <span class="nc">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">price</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="n">generateStock</span><span class="o">(</span><span class="n">symbol</span><span class="k">:</span> <span class="kt">String</span><span class="o">)(</span><span class="n">sigma</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)(</span><span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">StockPrice</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span> |
| </span></span><span class="line"><span class="cl"> <span class="k">var</span> <span class="n">price</span> <span class="k">=</span> <span class="mf">1000.</span> |
| </span></span><span class="line"><span class="cl"> <span class="k">while</span> <span class="o">(</span><span class="kc">true</span><span class="o">)</span> <span class="o">{</span> |
| </span></span><span class="line"><span class="cl"> <span class="n">price</span> <span class="k">=</span> <span class="n">price</span> <span class="o">+</span> <span class="nc">Random</span><span class="o">.</span><span class="n">nextGaussian</span> <span class="o">*</span> <span class="n">sigma</span> |
| </span></span><span class="line"><span class="cl"> <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="nc">StockPrice</span><span class="o">(</span><span class="n">symbol</span><span class="o">,</span> <span class="n">price</span><span class="o">))</span> |
| </span></span><span class="line"><span class="cl"> <span class="nc">Thread</span><span class="o">.</span><span class="n">sleep</span><span class="o">(</span><span class="nc">Random</span><span class="o">.</span><span class="n">nextInt</span><span class="o">(</span><span class="mi">200</span><span class="o">))</span> |
| </span></span><span class="line"><span class="cl"> <span class="o">}</span> |
| </span></span><span class="line"><span class="cl"><span class="o">}</span></span></span></code></pre></div> |
| </div> |
| <div data-lang="java7" markdown="1"> |
| <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">private</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">ArrayList</span><span class="o"><</span><span class="n">String</span><span class="o">></span><span class="w"> </span><span class="n">SYMBOLS</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">ArrayList</span><span class="o"><</span><span class="n">String</span><span class="o">></span><span class="p">(</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Arrays</span><span class="p">.</span><span class="na">asList</span><span class="p">(</span><span class="s">"SPX"</span><span class="p">,</span><span class="w"> </span><span class="s">"FTSE"</span><span class="p">,</span><span class="w"> </span><span class="s">"DJI"</span><span class="p">,</span><span class="w"> </span><span class="s">"DJT"</span><span class="p">,</span><span class="w"> </span><span class="s">"BUX"</span><span class="p">,</span><span class="w"> </span><span class="s">"DAX"</span><span class="p">,</span><span class="w"> </span><span class="s">"GOOG"</span><span class="p">));</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">class</span> <span class="nc">StockPrice</span><span class="w"> </span><span class="kd">implements</span><span class="w"> </span><span class="n">Serializable</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="n">symbol</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">Double</span><span class="w"> </span><span class="n">price</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="nf">StockPrice</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="nf">StockPrice</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">symbol</span><span class="p">,</span><span class="w"> </span><span class="n">Double</span><span class="w"> </span><span class="n">price</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">this</span><span class="p">.</span><span class="na">symbol</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">symbol</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">this</span><span class="p">.</span><span class="na">price</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">price</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="nf">toString</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="s">"StockPrice{"</span><span class="w"> </span><span class="o">+</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s">"symbol='"</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="n">symbol</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="sc">'\''</span><span class="w"> </span><span class="o">+</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s">", count="</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="n">price</span><span class="w"> </span><span class="o">+</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="sc">'}'</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">class</span> <span class="nc">StockSource</span><span class="w"> </span><span class="kd">implements</span><span class="w"> </span><span class="n">SourceFunction</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">Double</span><span class="w"> </span><span class="n">price</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="n">symbol</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">Integer</span><span class="w"> </span><span class="n">sigma</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="nf">StockSource</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">symbol</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="w"> </span><span class="n">sigma</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">this</span><span class="p">.</span><span class="na">symbol</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">symbol</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">this</span><span class="p">.</span><span class="na">sigma</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">sigma</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">invoke</span><span class="p">(</span><span class="n">Collector</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span><span class="w"> </span><span class="n">collector</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">price</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">DEFAULT_PRICE</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Random</span><span class="w"> </span><span class="n">random</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Random</span><span class="p">();</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">while</span><span class="w"> </span><span class="p">(</span><span class="kc">true</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">price</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">price</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="n">random</span><span class="p">.</span><span class="na">nextGaussian</span><span class="p">()</span><span class="w"> </span><span class="o">*</span><span class="w"> </span><span class="n">sigma</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">collector</span><span class="p">.</span><span class="na">collect</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">StockPrice</span><span class="p">(</span><span class="n">symbol</span><span class="p">,</span><span class="w"> </span><span class="n">price</span><span class="p">));</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Thread</span><span class="p">.</span><span class="na">sleep</span><span class="p">(</span><span class="n">random</span><span class="p">.</span><span class="na">nextInt</span><span class="p">(</span><span class="n">200</span><span class="p">));</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span></span></span></code></pre></div> |
| </div> |
| </div> |
| <p>To read from the text socket stream please make sure that you have a |
| socket running. For the sake of the example executing the following |
| command in a terminal does the job. You can get |
| <a href="http://netcat.sourceforge.net/">netcat</a> here if it is not available |
| on your machine.</p> |
| <pre tabindex="0"><code>nc -lk 9999 |
| </code></pre><p>If we execute the program from our IDE we see the system the |
| stock prices being generated:</p> |
| <pre tabindex="0"><code>INFO Job execution switched to status RUNNING. |
| INFO Socket Stream(1/1) switched to SCHEDULED |
| INFO Socket Stream(1/1) switched to DEPLOYING |
| INFO Custom Source(1/1) switched to SCHEDULED |
| INFO Custom Source(1/1) switched to DEPLOYING |
| … |
| 1> StockPrice{symbol='SPX', count=1011.3405732645239} |
| 2> StockPrice{symbol='SPX', count=1018.3381290039248} |
| 1> StockPrice{symbol='DJI', count=1036.7454894073978} |
| 3> StockPrice{symbol='DJI', count=1135.1170217478427} |
| 3> StockPrice{symbol='BUX', count=1053.667523187687} |
| 4> StockPrice{symbol='BUX', count=1036.552601487263} |
| </code></pre><p><a href="#top">Back to top</a></p> |
| <h2 id="window-aggregations"> |
| Window aggregations |
| <a class="anchor" href="#window-aggregations">#</a> |
| </h2> |
| <p>We first compute aggregations on time-based windows of the |
| data. Flink provides <a href="//nightlies.apache.org/flink/flink-docs-master/apis/streaming/windows.html">flexible windowing semantics</a> where windows can |
| also be defined based on count of records or any custom user defined |
| logic.</p> |
| <p>We partition our stream into windows of 10 seconds and slide the |
| window every 5 seconds. We compute three statistics every 5 seconds. |
| The first is the minimum price of all stocks, the second produces |
| maximum price per stock, and the third is the mean stock price |
| (using a map window function). Aggregations and groupings can be |
| performed on named fields of POJOs, making the code more readable.</p> |
| <img alt="Basic windowing aggregations" src="/img/blog/blog_basic_window.png" width="70%" class="img-responsive center-block"> |
| <div class="codetabs" markdown="1"> |
| <div data-lang="scala" markdown="1"> |
| <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="c1">//Define the desired time window |
| </span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">val</span> <span class="n">windowedStream</span> <span class="k">=</span> <span class="n">stockStream</span> |
| </span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">10</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">)).</span><span class="n">every</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">5</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"><span class="c1">//Compute some simple statistics on a rolling window |
| </span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">val</span> <span class="n">lowest</span> <span class="k">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="n">minBy</span><span class="o">(</span><span class="s">"price"</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"><span class="k">val</span> <span class="n">maxByStock</span> <span class="k">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">).</span><span class="n">maxBy</span><span class="o">(</span><span class="s">"price"</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"><span class="k">val</span> <span class="n">rollingMean</span> <span class="k">=</span> <span class="n">windowedStream</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">).</span><span class="n">mapWindow</span><span class="o">(</span><span class="n">mean</span> <span class="k">_</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"><span class="c1">//Compute the mean of a window |
| </span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">def</span> <span class="n">mean</span><span class="o">(</span><span class="n">ts</span><span class="k">:</span> <span class="kt">Iterable</span><span class="o">[</span><span class="kt">StockPrice</span><span class="o">],</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">StockPrice</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span> |
| </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="o">(</span><span class="n">ts</span><span class="o">.</span><span class="n">nonEmpty</span><span class="o">)</span> <span class="o">{</span> |
| </span></span><span class="line"><span class="cl"> <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="nc">StockPrice</span><span class="o">(</span><span class="n">ts</span><span class="o">.</span><span class="n">head</span><span class="o">.</span><span class="n">symbol</span><span class="o">,</span> <span class="n">ts</span><span class="o">.</span><span class="n">foldLeft</span><span class="o">(</span><span class="mi">0</span><span class="k">:</span> <span class="kt">Double</span><span class="o">)(</span><span class="k">_</span> <span class="o">+</span> <span class="k">_</span><span class="o">.</span><span class="n">price</span><span class="o">)</span> <span class="o">/</span> <span class="n">ts</span><span class="o">.</span><span class="n">size</span><span class="o">))</span> |
| </span></span><span class="line"><span class="cl"> <span class="o">}</span> |
| </span></span><span class="line"><span class="cl"><span class="o">}</span></span></span></code></pre></div> |
| </div> |
| <div data-lang="java7" markdown="1"> |
| <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="c1">//Define the desired time window</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">WindowedDataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span><span class="w"> </span><span class="n">windowedStream</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">stockStream</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">window</span><span class="p">(</span><span class="n">Time</span><span class="p">.</span><span class="na">of</span><span class="p">(</span><span class="n">10</span><span class="p">,</span><span class="w"> </span><span class="n">TimeUnit</span><span class="p">.</span><span class="na">SECONDS</span><span class="p">))</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">every</span><span class="p">(</span><span class="n">Time</span><span class="p">.</span><span class="na">of</span><span class="p">(</span><span class="n">5</span><span class="p">,</span><span class="w"> </span><span class="n">TimeUnit</span><span class="p">.</span><span class="na">SECONDS</span><span class="p">));</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">//Compute some simple statistics on a rolling window</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span><span class="w"> </span><span class="n">lowest</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">windowedStream</span><span class="p">.</span><span class="na">minBy</span><span class="p">(</span><span class="s">"price"</span><span class="p">).</span><span class="na">flatten</span><span class="p">();</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span><span class="w"> </span><span class="n">maxByStock</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">windowedStream</span><span class="p">.</span><span class="na">groupBy</span><span class="p">(</span><span class="s">"symbol"</span><span class="p">)</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">maxBy</span><span class="p">(</span><span class="s">"price"</span><span class="p">).</span><span class="na">flatten</span><span class="p">();</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span><span class="w"> </span><span class="n">rollingMean</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">windowedStream</span><span class="p">.</span><span class="na">groupBy</span><span class="p">(</span><span class="s">"symbol"</span><span class="p">)</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">mapWindow</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">WindowMean</span><span class="p">()).</span><span class="na">flatten</span><span class="p">();</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">//Compute the mean of a window</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">class</span> <span class="nc">WindowMean</span><span class="w"> </span><span class="kd">implements</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">WindowMapFunction</span><span class="o"><</span><span class="n">StockPrice</span><span class="p">,</span><span class="w"> </span><span class="n">StockPrice</span><span class="o">></span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">Double</span><span class="w"> </span><span class="n">sum</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">0</span><span class="p">.</span><span class="na">0</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">Integer</span><span class="w"> </span><span class="n">count</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">0</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="n">symbol</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s">""</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">mapWindow</span><span class="p">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span><span class="w"> </span><span class="n">values</span><span class="p">,</span><span class="w"> </span><span class="n">Collector</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span><span class="w"> </span><span class="n">out</span><span class="p">)</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">values</span><span class="p">.</span><span class="na">iterator</span><span class="p">().</span><span class="na">hasNext</span><span class="p">())</span><span class="w"> </span><span class="p">{</span><span class="n">s</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nf">for</span><span class="w"> </span><span class="p">(</span><span class="n">StockPrice</span><span class="w"> </span><span class="n">sp</span><span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="n">values</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">sum</span><span class="w"> </span><span class="o">+=</span><span class="w"> </span><span class="n">sp</span><span class="p">.</span><span class="na">price</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">symbol</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">sp</span><span class="p">.</span><span class="na">symbol</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">count</span><span class="o">++</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">out</span><span class="p">.</span><span class="na">collect</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">StockPrice</span><span class="p">(</span><span class="n">symbol</span><span class="p">,</span><span class="w"> </span><span class="n">sum</span><span class="w"> </span><span class="o">/</span><span class="w"> </span><span class="n">count</span><span class="p">));</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span></span></span></code></pre></div> |
| </div> |
| </div> |
| <p>Let us note that to print a windowed stream one has to flatten it first, |
| thus getting rid of the windowing logic. For example execute |
| <code>maxByStock.flatten().print()</code> to print the stream of maximum prices of |
| the time windows by stock. For Scala <code>flatten()</code> is called implicitly |
| when needed.</p> |
| <p><a href="#top">Back to top</a></p> |
| <h2 id="data-driven-windows"> |
| Data-driven windows |
| <a class="anchor" href="#data-driven-windows">#</a> |
| </h2> |
| <p>The most interesting event in the stream is when the price of a stock |
| is changing rapidly. We can send a warning when a stock price changes |
| more than 5% since the last warning. To do that, we use a delta-based window providing a |
| threshold on when the computation will be triggered, a function to |
| compute the difference and a default value with which the first record |
| is compared. We also create a <code>Count</code> data type to count the warnings |
| every 30 seconds.</p> |
| <img alt="Data-driven windowing semantics" src="/img/blog/blog_data_driven.png" width="100%" class="img-responsive center-block"> |
| <div class="codetabs" markdown="1"> |
| <div data-lang="scala" markdown="1"> |
| <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="k">case</span> <span class="k">class</span> <span class="nc">Count</span><span class="o">(</span><span class="n">symbol</span><span class="k">:</span> <span class="kt">String</span><span class="o">,</span> <span class="n">count</span><span class="k">:</span> <span class="kt">Int</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"><span class="k">val</span> <span class="n">defaultPrice</span> <span class="k">=</span> <span class="nc">StockPrice</span><span class="o">(</span><span class="s">""</span><span class="o">,</span> <span class="mi">1000</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"><span class="c1">//Use delta policy to create price change warnings |
| </span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">val</span> <span class="n">priceWarnings</span> <span class="k">=</span> <span class="n">stockStream</span><span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Delta</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mf">0.05</span><span class="o">,</span> <span class="n">priceChange</span><span class="o">,</span> <span class="n">defaultPrice</span><span class="o">))</span> |
| </span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">mapWindow</span><span class="o">(</span><span class="n">sendWarning</span> <span class="k">_</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"><span class="c1">//Count the number of warnings every half a minute |
| </span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">val</span> <span class="n">warningsPerStock</span> <span class="k">=</span> <span class="n">priceWarnings</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="nc">Count</span><span class="o">(</span><span class="k">_</span><span class="o">,</span> <span class="mi">1</span><span class="o">))</span> |
| </span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span> |
| </span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">sum</span><span class="o">(</span><span class="s">"count"</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="n">priceChange</span><span class="o">(</span><span class="n">p1</span><span class="k">:</span> <span class="kt">StockPrice</span><span class="o">,</span> <span class="n">p2</span><span class="k">:</span> <span class="kt">StockPrice</span><span class="o">)</span><span class="k">:</span> <span class="kt">Double</span> <span class="o">=</span> <span class="o">{</span> |
| </span></span><span class="line"><span class="cl"> <span class="nc">Math</span><span class="o">.</span><span class="n">abs</span><span class="o">(</span><span class="n">p1</span><span class="o">.</span><span class="n">price</span> <span class="o">/</span> <span class="n">p2</span><span class="o">.</span><span class="n">price</span> <span class="o">-</span> <span class="mi">1</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"><span class="o">}</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="n">sendWarning</span><span class="o">(</span><span class="n">ts</span><span class="k">:</span> <span class="kt">Iterable</span><span class="o">[</span><span class="kt">StockPrice</span><span class="o">],</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span> |
| </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="o">(</span><span class="n">ts</span><span class="o">.</span><span class="n">nonEmpty</span><span class="o">)</span> <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">ts</span><span class="o">.</span><span class="n">head</span><span class="o">.</span><span class="n">symbol</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"><span class="o">}</span></span></span></code></pre></div> |
| </div> |
| <div data-lang="java7" markdown="1"> |
| <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">private</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">Double</span><span class="w"> </span><span class="n">DEFAULT_PRICE</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">1000</span><span class="p">.;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">private</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">StockPrice</span><span class="w"> </span><span class="n">DEFAULT_STOCK_PRICE</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">StockPrice</span><span class="p">(</span><span class="s">""</span><span class="p">,</span><span class="w"> </span><span class="n">DEFAULT_PRICE</span><span class="p">);</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">//Use delta policy to create price change warnings</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span><span class="w"> </span><span class="n">priceWarnings</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">stockStream</span><span class="p">.</span><span class="na">groupBy</span><span class="p">(</span><span class="s">"symbol"</span><span class="p">)</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">window</span><span class="p">(</span><span class="n">Delta</span><span class="p">.</span><span class="na">of</span><span class="p">(</span><span class="n">0</span><span class="p">.</span><span class="na">05</span><span class="p">,</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">DeltaFunction</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">double</span><span class="w"> </span><span class="nf">getDelta</span><span class="p">(</span><span class="n">StockPrice</span><span class="w"> </span><span class="n">oldDataPoint</span><span class="p">,</span><span class="w"> </span><span class="n">StockPrice</span><span class="w"> </span><span class="n">newDataPoint</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">Math</span><span class="p">.</span><span class="na">abs</span><span class="p">(</span><span class="n">oldDataPoint</span><span class="p">.</span><span class="na">price</span><span class="w"> </span><span class="o">-</span><span class="w"> </span><span class="n">newDataPoint</span><span class="p">.</span><span class="na">price</span><span class="p">);</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">},</span><span class="w"> </span><span class="n">DEFAULT_STOCK_PRICE</span><span class="p">))</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">.</span><span class="na">mapWindow</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">SendWarning</span><span class="p">()).</span><span class="na">flatten</span><span class="p">();</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">//Count the number of warnings every half a minute</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o"><</span><span class="n">Count</span><span class="o">></span><span class="w"> </span><span class="n">warningsPerStock</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">priceWarnings</span><span class="p">.</span><span class="na">map</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">MapFunction</span><span class="o"><</span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">Count</span><span class="o">></span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">Count</span><span class="w"> </span><span class="nf">map</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">value</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Count</span><span class="p">(</span><span class="n">value</span><span class="p">,</span><span class="w"> </span><span class="n">1</span><span class="p">);</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}).</span><span class="na">groupBy</span><span class="p">(</span><span class="s">"symbol"</span><span class="p">).</span><span class="na">window</span><span class="p">(</span><span class="n">Time</span><span class="p">.</span><span class="na">of</span><span class="p">(</span><span class="n">30</span><span class="p">,</span><span class="w"> </span><span class="n">TimeUnit</span><span class="p">.</span><span class="na">SECONDS</span><span class="p">)).</span><span class="na">sum</span><span class="p">(</span><span class="s">"count"</span><span class="p">).</span><span class="na">flatten</span><span class="p">();</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">class</span> <span class="nc">Count</span><span class="w"> </span><span class="kd">implements</span><span class="w"> </span><span class="n">Serializable</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="n">symbol</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">Integer</span><span class="w"> </span><span class="n">count</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="nf">Count</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="nf">Count</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">symbol</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="w"> </span><span class="n">count</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">this</span><span class="p">.</span><span class="na">symbol</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">symbol</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">this</span><span class="p">.</span><span class="na">count</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">count</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="nf">toString</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="s">"Count{"</span><span class="w"> </span><span class="o">+</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s">"symbol='"</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="n">symbol</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="sc">'\''</span><span class="w"> </span><span class="o">+</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s">", count="</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="n">count</span><span class="w"> </span><span class="o">+</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="sc">'}'</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="kd">class</span> <span class="nc">SendWarning</span><span class="w"> </span><span class="kd">implements</span><span class="w"> </span><span class="n">MapWindowFunction</span><span class="o"><</span><span class="n">StockPrice</span><span class="p">,</span><span class="w"> </span><span class="n">String</span><span class="o">></span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">mapWindow</span><span class="p">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">StockPrice</span><span class="o">></span><span class="w"> </span><span class="n">values</span><span class="p">,</span><span class="w"> </span><span class="n">Collector</span><span class="o"><</span><span class="n">String</span><span class="o">></span><span class="w"> </span><span class="n">out</span><span class="p">)</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">values</span><span class="p">.</span><span class="na">iterator</span><span class="p">().</span><span class="na">hasNext</span><span class="p">())</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">out</span><span class="p">.</span><span class="na">collect</span><span class="p">(</span><span class="n">values</span><span class="p">.</span><span class="na">iterator</span><span class="p">().</span><span class="na">next</span><span class="p">().</span><span class="na">symbol</span><span class="p">);</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span></span></span></code></pre></div> |
| </div> |
| </div> |
| <p><a href="#top">Back to top</a></p> |
| <h2 id="combining-with-a-twitter-stream"> |
| Combining with a Twitter stream |
| <a class="anchor" href="#combining-with-a-twitter-stream">#</a> |
| </h2> |
| <p>Next, we will read a Twitter stream and correlate it with our stock |
| price stream. Flink has support for connecting to <a href="//nightlies.apache.org/flink/flink-docs-master/apis/streaming/connectors/twitter.html">Twitter’s |
| API</a> |
| but for the sake of this example we generate dummy tweet data.</p> |
| <img alt="Social media analytics" src="/img/blog/blog_social_media.png" width="100%" class="img-responsive center-block"> |
| <div class="codetabs" markdown="1"> |
| <div data-lang="scala" markdown="1"> |
| <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="c1">//Read a stream of tweets |
| </span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">val</span> <span class="n">tweetStream</span> <span class="k">=</span> <span class="n">env</span><span class="o">.</span><span class="n">addSource</span><span class="o">(</span><span class="n">generateTweets</span> <span class="k">_</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"><span class="c1">//Extract the stock symbols |
| </span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">val</span> <span class="n">mentionedSymbols</span> <span class="k">=</span> <span class="n">tweetStream</span><span class="o">.</span><span class="n">flatMap</span><span class="o">(</span><span class="n">tweet</span> <span class="k">=></span> <span class="n">tweet</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">" "</span><span class="o">))</span> |
| </span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">toUpperCase</span><span class="o">())</span> |
| </span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">filter</span><span class="o">(</span><span class="n">symbols</span><span class="o">.</span><span class="n">contains</span><span class="o">(</span><span class="k">_</span><span class="o">))</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"><span class="c1">//Count the extracted symbols |
| </span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">val</span> <span class="n">tweetsPerStock</span> <span class="k">=</span> <span class="n">mentionedSymbols</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="nc">Count</span><span class="o">(</span><span class="k">_</span><span class="o">,</span> <span class="mi">1</span><span class="o">))</span> |
| </span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">groupBy</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span> |
| </span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">sum</span><span class="o">(</span><span class="s">"count"</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="n">generateTweets</span><span class="o">(</span><span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span> |
| </span></span><span class="line"><span class="cl"> <span class="k">while</span> <span class="o">(</span><span class="kc">true</span><span class="o">)</span> <span class="o">{</span> |
| </span></span><span class="line"><span class="cl"> <span class="k">val</span> <span class="n">s</span> <span class="k">=</span> <span class="k">for</span> <span class="o">(</span><span class="n">i</span> <span class="k"><-</span> <span class="mi">1</span> <span class="n">to</span> <span class="mi">3</span><span class="o">)</span> <span class="k">yield</span> <span class="o">(</span><span class="n">symbols</span><span class="o">(</span><span class="nc">Random</span><span class="o">.</span><span class="n">nextInt</span><span class="o">(</span><span class="n">symbols</span><span class="o">.</span><span class="n">size</span><span class="o">)))</span> |
| </span></span><span class="line"><span class="cl"> <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">s</span><span class="o">.</span><span class="n">mkString</span><span class="o">(</span><span class="s">" "</span><span class="o">))</span> |
| </span></span><span class="line"><span class="cl"> <span class="nc">Thread</span><span class="o">.</span><span class="n">sleep</span><span class="o">(</span><span class="nc">Random</span><span class="o">.</span><span class="n">nextInt</span><span class="o">(</span><span class="mi">500</span><span class="o">))</span> |
| </span></span><span class="line"><span class="cl"> <span class="o">}</span> |
| </span></span><span class="line"><span class="cl"><span class="o">}</span></span></span></code></pre></div> |
| </div> |
| <div data-lang="java7" markdown="1"> |
| <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="c1">//Read a stream of tweets</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span><span class="w"> </span><span class="n">tweetStream</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">env</span><span class="p">.</span><span class="na">addSource</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">TweetSource</span><span class="p">());</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">//Extract the stock symbols</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o"><</span><span class="n">String</span><span class="o">></span><span class="w"> </span><span class="n">mentionedSymbols</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">tweetStream</span><span class="p">.</span><span class="na">flatMap</span><span class="p">(</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">FlatMapFunction</span><span class="o"><</span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">String</span><span class="o">></span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">flatMap</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">value</span><span class="p">,</span><span class="w"> </span><span class="n">Collector</span><span class="o"><</span><span class="n">String</span><span class="o">></span><span class="w"> </span><span class="n">out</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">String</span><span class="o">[]</span><span class="w"> </span><span class="n">words</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">value</span><span class="p">.</span><span class="na">split</span><span class="p">(</span><span class="s">" "</span><span class="p">);</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">word</span><span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="n">words</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">out</span><span class="p">.</span><span class="na">collect</span><span class="p">(</span><span class="n">word</span><span class="p">.</span><span class="na">toUpperCase</span><span class="p">());</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}).</span><span class="na">filter</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">FilterFunction</span><span class="o"><</span><span class="n">String</span><span class="o">></span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">boolean</span><span class="w"> </span><span class="nf">filter</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">value</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">SYMBOLS</span><span class="p">.</span><span class="na">contains</span><span class="p">(</span><span class="n">value</span><span class="p">);</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">});</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">//Count the extracted symbols</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o"><</span><span class="n">Count</span><span class="o">></span><span class="w"> </span><span class="n">tweetsPerStock</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">mentionedSymbols</span><span class="p">.</span><span class="na">map</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">MapFunction</span><span class="o"><</span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">Count</span><span class="o">></span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">Count</span><span class="w"> </span><span class="nf">map</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">value</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Count</span><span class="p">(</span><span class="n">value</span><span class="p">,</span><span class="w"> </span><span class="n">1</span><span class="p">);</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}).</span><span class="na">groupBy</span><span class="p">(</span><span class="s">"symbol"</span><span class="p">).</span><span class="na">window</span><span class="p">(</span><span class="n">Time</span><span class="p">.</span><span class="na">of</span><span class="p">(</span><span class="n">30</span><span class="p">,</span><span class="w"> </span><span class="n">TimeUnit</span><span class="p">.</span><span class="na">SECONDS</span><span class="p">)).</span><span class="na">sum</span><span class="p">(</span><span class="s">"count"</span><span class="p">).</span><span class="na">flatten</span><span class="p">();</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="kd">class</span> <span class="nc">TweetSource</span><span class="w"> </span><span class="kd">implements</span><span class="w"> </span><span class="n">SourceFunction</span><span class="o"><</span><span class="n">String</span><span class="o">></span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Random</span><span class="w"> </span><span class="n">random</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">StringBuilder</span><span class="w"> </span><span class="n">stringBuilder</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">invoke</span><span class="p">(</span><span class="n">Collector</span><span class="o"><</span><span class="n">String</span><span class="o">></span><span class="w"> </span><span class="n">collector</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">random</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Random</span><span class="p">();</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">stringBuilder</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">StringBuilder</span><span class="p">();</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">while</span><span class="w"> </span><span class="p">(</span><span class="kc">true</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">stringBuilder</span><span class="p">.</span><span class="na">setLength</span><span class="p">(</span><span class="n">0</span><span class="p">);</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="kt">int</span><span class="w"> </span><span class="n">i</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">0</span><span class="p">;</span><span class="w"> </span><span class="n">i</span><span class="w"> </span><span class="o"><</span><span class="w"> </span><span class="n">3</span><span class="p">;</span><span class="w"> </span><span class="n">i</span><span class="o">++</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">stringBuilder</span><span class="p">.</span><span class="na">append</span><span class="p">(</span><span class="s">" "</span><span class="p">);</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">stringBuilder</span><span class="p">.</span><span class="na">append</span><span class="p">(</span><span class="n">SYMBOLS</span><span class="p">.</span><span class="na">get</span><span class="p">(</span><span class="n">random</span><span class="p">.</span><span class="na">nextInt</span><span class="p">(</span><span class="n">SYMBOLS</span><span class="p">.</span><span class="na">size</span><span class="p">())));</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">collector</span><span class="p">.</span><span class="na">collect</span><span class="p">(</span><span class="n">stringBuilder</span><span class="p">.</span><span class="na">toString</span><span class="p">());</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Thread</span><span class="p">.</span><span class="na">sleep</span><span class="p">(</span><span class="n">500</span><span class="p">);</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span></span></span></code></pre></div> |
| </div> |
| </div> |
| <p><a href="#top">Back to top</a></p> |
| <h2 id="streaming-joins"> |
| Streaming joins |
| <a class="anchor" href="#streaming-joins">#</a> |
| </h2> |
| <p>Finally, we join real-time tweets and stock prices and compute a |
| rolling correlation between the number of price warnings and the |
| number of mentions of a given stock in the Twitter stream. As both of |
| these data streams are potentially infinite, we apply the join on a |
| 30-second window.</p> |
| <img alt="Streaming joins" src="/img/blog/blog_stream_join.png" width="60%" class="img-responsive center-block"> |
| <div class="codetabs" markdown="1"> |
| <div data-lang="scala" markdown="1"> |
| <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="c1">//Join warnings and parsed tweets |
| </span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">val</span> <span class="n">tweetsAndWarning</span> <span class="k">=</span> <span class="n">warningsPerStock</span><span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">tweetsPerStock</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">onWindow</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">where</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">equalTo</span><span class="o">(</span><span class="s">"symbol"</span><span class="o">)</span> <span class="o">{</span> <span class="o">(</span><span class="n">c1</span><span class="o">,</span> <span class="n">c2</span><span class="o">)</span> <span class="k">=></span> <span class="o">(</span><span class="n">c1</span><span class="o">.</span><span class="n">count</span><span class="o">,</span> <span class="n">c2</span><span class="o">.</span><span class="n">count</span><span class="o">)</span> <span class="o">}</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"><span class="k">val</span> <span class="n">rollingCorrelation</span> <span class="k">=</span> <span class="n">tweetsAndWarning</span><span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Time</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="mi">30</span><span class="o">,</span> <span class="nc">SECONDS</span><span class="o">))</span> |
| </span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">mapWindow</span><span class="o">(</span><span class="n">computeCorrelation</span> <span class="k">_</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"><span class="n">rollingCorrelation</span> <span class="n">print</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"><span class="c1">//Compute rolling correlation |
| </span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">def</span> <span class="n">computeCorrelation</span><span class="o">(</span><span class="n">input</span><span class="k">:</span> <span class="kt">Iterable</span><span class="o">[(</span><span class="kt">Int</span>, <span class="kt">Int</span><span class="o">)],</span> <span class="n">out</span><span class="k">:</span> <span class="kt">Collector</span><span class="o">[</span><span class="kt">Double</span><span class="o">])</span> <span class="k">=</span> <span class="o">{</span> |
| </span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="o">(</span><span class="n">input</span><span class="o">.</span><span class="n">nonEmpty</span><span class="o">)</span> <span class="o">{</span> |
| </span></span><span class="line"><span class="cl"> <span class="k">val</span> <span class="n">var1</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">_1</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> <span class="k">val</span> <span class="n">mean1</span> <span class="k">=</span> <span class="n">average</span><span class="o">(</span><span class="n">var1</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> <span class="k">val</span> <span class="n">var2</span> <span class="k">=</span> <span class="n">input</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="k">_</span><span class="o">.</span><span class="n">_2</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> <span class="k">val</span> <span class="n">mean2</span> <span class="k">=</span> <span class="n">average</span><span class="o">(</span><span class="n">var2</span><span class="o">)</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"> <span class="k">val</span> <span class="n">cov</span> <span class="k">=</span> <span class="n">average</span><span class="o">(</span><span class="n">var1</span><span class="o">.</span><span class="n">zip</span><span class="o">(</span><span class="n">var2</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="n">xy</span> <span class="k">=></span> <span class="o">(</span><span class="n">xy</span><span class="o">.</span><span class="n">_1</span> <span class="o">-</span> <span class="n">mean1</span><span class="o">)</span> <span class="o">*</span> <span class="o">(</span><span class="n">xy</span><span class="o">.</span><span class="n">_2</span> <span class="o">-</span> <span class="n">mean2</span><span class="o">)))</span> |
| </span></span><span class="line"><span class="cl"> <span class="k">val</span> <span class="n">d1</span> <span class="k">=</span> <span class="nc">Math</span><span class="o">.</span><span class="n">sqrt</span><span class="o">(</span><span class="n">average</span><span class="o">(</span><span class="n">var1</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">x</span> <span class="k">=></span> <span class="nc">Math</span><span class="o">.</span><span class="n">pow</span><span class="o">((</span><span class="n">x</span> <span class="o">-</span> <span class="n">mean1</span><span class="o">),</span> <span class="mi">2</span><span class="o">))))</span> |
| </span></span><span class="line"><span class="cl"> <span class="k">val</span> <span class="n">d2</span> <span class="k">=</span> <span class="nc">Math</span><span class="o">.</span><span class="n">sqrt</span><span class="o">(</span><span class="n">average</span><span class="o">(</span><span class="n">var2</span><span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="n">x</span> <span class="k">=></span> <span class="nc">Math</span><span class="o">.</span><span class="n">pow</span><span class="o">((</span><span class="n">x</span> <span class="o">-</span> <span class="n">mean2</span><span class="o">),</span> <span class="mi">2</span><span class="o">))))</span> |
| </span></span><span class="line"><span class="cl"> |
| </span></span><span class="line"><span class="cl"> <span class="n">out</span><span class="o">.</span><span class="n">collect</span><span class="o">(</span><span class="n">cov</span> <span class="o">/</span> <span class="o">(</span><span class="n">d1</span> <span class="o">*</span> <span class="n">d2</span><span class="o">))</span> |
| </span></span><span class="line"><span class="cl"> <span class="o">}</span> |
| </span></span><span class="line"><span class="cl"><span class="o">}</span></span></span></code></pre></div> |
| </div> |
| <div data-lang="java7" markdown="1"> |
| <div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="c1">//Join warnings and parsed tweets</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="o">>></span><span class="w"> </span><span class="n">tweetsAndWarning</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">warningsPerStock</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">join</span><span class="p">(</span><span class="n">tweetsPerStock</span><span class="p">)</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">onWindow</span><span class="p">(</span><span class="n">30</span><span class="p">,</span><span class="w"> </span><span class="n">TimeUnit</span><span class="p">.</span><span class="na">SECONDS</span><span class="p">)</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">where</span><span class="p">(</span><span class="s">"symbol"</span><span class="p">)</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">equalTo</span><span class="p">(</span><span class="s">"symbol"</span><span class="p">)</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">with</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">JoinFunction</span><span class="o"><</span><span class="n">Count</span><span class="p">,</span><span class="w"> </span><span class="n">Count</span><span class="p">,</span><span class="w"> </span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="o">>></span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="o">></span><span class="w"> </span><span class="nf">join</span><span class="p">(</span><span class="n">Count</span><span class="w"> </span><span class="n">first</span><span class="p">,</span><span class="w"> </span><span class="n">Count</span><span class="w"> </span><span class="n">second</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="o">></span><span class="p">(</span><span class="n">first</span><span class="p">.</span><span class="na">count</span><span class="p">,</span><span class="w"> </span><span class="n">second</span><span class="p">.</span><span class="na">count</span><span class="p">);</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">});</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">//Compute rolling correlation</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o"><</span><span class="n">Double</span><span class="o">></span><span class="w"> </span><span class="n">rollingCorrelation</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">tweetsAndWarning</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">window</span><span class="p">(</span><span class="n">Time</span><span class="p">.</span><span class="na">of</span><span class="p">(</span><span class="n">30</span><span class="p">,</span><span class="w"> </span><span class="n">TimeUnit</span><span class="p">.</span><span class="na">SECONDS</span><span class="p">))</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">mapWindow</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">WindowCorrelation</span><span class="p">());</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">rollingCorrelation</span><span class="p">.</span><span class="na">print</span><span class="p">();</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="kd">class</span> <span class="nc">WindowCorrelation</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">implements</span><span class="w"> </span><span class="n">WindowMapFunction</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="o">></span><span class="p">,</span><span class="w"> </span><span class="n">Double</span><span class="o">></span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">Integer</span><span class="w"> </span><span class="n">leftSum</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">Integer</span><span class="w"> </span><span class="n">rightSum</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">Integer</span><span class="w"> </span><span class="n">count</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">Double</span><span class="w"> </span><span class="n">leftMean</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">Double</span><span class="w"> </span><span class="n">rightMean</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">Double</span><span class="w"> </span><span class="n">cov</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">Double</span><span class="w"> </span><span class="n">leftSd</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">Double</span><span class="w"> </span><span class="n">rightSd</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">mapWindow</span><span class="p">(</span><span class="n">Iterable</span><span class="o"><</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="o">>></span><span class="w"> </span><span class="n">values</span><span class="p">,</span><span class="w"> </span><span class="n">Collector</span><span class="o"><</span><span class="n">Double</span><span class="o">></span><span class="w"> </span><span class="n">out</span><span class="p">)</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">leftSum</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">0</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">rightSum</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">0</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">count</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">0</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">cov</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">0</span><span class="p">.;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">leftSd</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">0</span><span class="p">.;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">rightSd</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">0</span><span class="p">.;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">//compute mean for both sides, save count</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="o">></span><span class="w"> </span><span class="n">pair</span><span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="n">values</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">leftSum</span><span class="w"> </span><span class="o">+=</span><span class="w"> </span><span class="n">pair</span><span class="p">.</span><span class="na">f0</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">rightSum</span><span class="w"> </span><span class="o">+=</span><span class="w"> </span><span class="n">pair</span><span class="p">.</span><span class="na">f1</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">count</span><span class="o">++</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">leftMean</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">leftSum</span><span class="p">.</span><span class="na">doubleValue</span><span class="p">()</span><span class="w"> </span><span class="o">/</span><span class="w"> </span><span class="n">count</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">rightMean</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">rightSum</span><span class="p">.</span><span class="na">doubleValue</span><span class="p">()</span><span class="w"> </span><span class="o">/</span><span class="w"> </span><span class="n">count</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">//compute covariance & std. deviations</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="o">></span><span class="w"> </span><span class="n">pair</span><span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="n">values</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">cov</span><span class="w"> </span><span class="o">+=</span><span class="w"> </span><span class="p">(</span><span class="n">pair</span><span class="p">.</span><span class="na">f0</span><span class="w"> </span><span class="o">-</span><span class="w"> </span><span class="n">leftMean</span><span class="p">)</span><span class="w"> </span><span class="o">*</span><span class="w"> </span><span class="p">(</span><span class="n">pair</span><span class="p">.</span><span class="na">f1</span><span class="w"> </span><span class="o">-</span><span class="w"> </span><span class="n">rightMean</span><span class="p">)</span><span class="w"> </span><span class="o">/</span><span class="w"> </span><span class="n">count</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="n">Tuple2</span><span class="o"><</span><span class="n">Integer</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="o">></span><span class="w"> </span><span class="n">pair</span><span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="n">values</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">leftSd</span><span class="w"> </span><span class="o">+=</span><span class="w"> </span><span class="n">Math</span><span class="p">.</span><span class="na">pow</span><span class="p">(</span><span class="n">pair</span><span class="p">.</span><span class="na">f0</span><span class="w"> </span><span class="o">-</span><span class="w"> </span><span class="n">leftMean</span><span class="p">,</span><span class="w"> </span><span class="n">2</span><span class="p">)</span><span class="w"> </span><span class="o">/</span><span class="w"> </span><span class="n">count</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">rightSd</span><span class="w"> </span><span class="o">+=</span><span class="w"> </span><span class="n">Math</span><span class="p">.</span><span class="na">pow</span><span class="p">(</span><span class="n">pair</span><span class="p">.</span><span class="na">f1</span><span class="w"> </span><span class="o">-</span><span class="w"> </span><span class="n">rightMean</span><span class="p">,</span><span class="w"> </span><span class="n">2</span><span class="p">)</span><span class="w"> </span><span class="o">/</span><span class="w"> </span><span class="n">count</span><span class="p">;</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">leftSd</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">Math</span><span class="p">.</span><span class="na">sqrt</span><span class="p">(</span><span class="n">leftSd</span><span class="p">);</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">rightSd</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">Math</span><span class="p">.</span><span class="na">sqrt</span><span class="p">(</span><span class="n">rightSd</span><span class="p">);</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">out</span><span class="p">.</span><span class="na">collect</span><span class="p">(</span><span class="n">cov</span><span class="w"> </span><span class="o">/</span><span class="w"> </span><span class="p">(</span><span class="n">leftSd</span><span class="w"> </span><span class="o">*</span><span class="w"> </span><span class="n">rightSd</span><span class="p">));</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> |
| </span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span></span></span></code></pre></div> |
| </div> |
| </div> |
| <p><a href="#top">Back to top</a></p> |
| <h2 id="other-things-to-try"> |
| Other things to try |
| <a class="anchor" href="#other-things-to-try">#</a> |
| </h2> |
| <p>For a full feature overview please check the <a href="//nightlies.apache.org/flink/flink-docs-master/apis/streaming/index.html">Streaming Guide</a>, which describes all the available API features. |
| You are very welcome to try out our features for different use-cases we are looking forward to your experiences. Feel free to <a href="http://flink.apache.org/community.html#mailing-lists">contact us</a>.</p> |
| <h2 id="upcoming-for-streaming"> |
| Upcoming for streaming |
| <a class="anchor" href="#upcoming-for-streaming">#</a> |
| </h2> |
| <p>There are some aspects of Flink Streaming that are subjects to |
| change by the next release making this application look even nicer.</p> |
| <p>Stay tuned for later blog posts on how Flink Streaming works |
| internally, fault tolerance, and performance measurements!</p> |
| <p><a href="#top">Back to top</a></p> |
| </p> |
| </article> |
| |
| |
| |
| |
| |
| |
| |
| <div class="edit-this-page"> |
| <p> |
| <a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a> |
| </p> |
| <p> |
| <a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2015-02-09-streaming-example.md"> |
| Edit This Page<i class="fa fa-edit fa-fw"></i> |
| </a> |
| </p> |
| </div> |
| |
| </section> |
| |
| <aside class="book-toc"> |
| |
| |
| |
| <nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3> |
| <ul> |
| <li> |
| <ul> |
| <li><a href="#reading-from-multiple-inputs">Reading from multiple inputs</a></li> |
| <li><a href="#window-aggregations">Window aggregations</a></li> |
| <li><a href="#data-driven-windows">Data-driven windows</a></li> |
| <li><a href="#combining-with-a-twitter-stream">Combining with a Twitter stream</a></li> |
| <li><a href="#streaming-joins">Streaming joins</a></li> |
| <li><a href="#other-things-to-try">Other things to try</a></li> |
| <li><a href="#upcoming-for-streaming">Upcoming for streaming</a></li> |
| </ul> |
| </li> |
| </ul> |
| </nav> |
| |
| |
| </aside> |
| <aside class="expand-toc hidden"> |
| <a class="toc" onclick="expandToc()" href="javascript:void(0)"> |
| <i class="fa fa-bars" aria-hidden="true"></i> |
| </a> |
| </aside> |
| |
| </main> |
| |
| <footer> |
| |
| |
| |
| <div class="separator"></div> |
| <div class="panels"> |
| <div class="wrapper"> |
| <div class="panel"> |
| <ul> |
| <li> |
| <a href="https://flink-packages.org/">flink-packages.org</a> |
| </li> |
| <li> |
| <a href="https://www.apache.org/">Apache Software Foundation</a> |
| </li> |
| <li> |
| <a href="https://www.apache.org/licenses/">License</a> |
| </li> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <li> |
| <a href="/zh/"> |
| <i class="fa fa-globe" aria-hidden="true"></i> 中文版 |
| </a> |
| </li> |
| |
| |
| |
| </ul> |
| </div> |
| <div class="panel"> |
| <ul> |
| <li> |
| <a href="/what-is-flink/security">Security</a--> |
| </li> |
| <li> |
| <a href="https://www.apache.org/foundation/sponsorship.html">Donate</a> |
| </li> |
| <li> |
| <a href="https://www.apache.org/foundation/thanks.html">Thanks</a> |
| </li> |
| </ul> |
| </div> |
| <div class="panel icons"> |
| <div> |
| <a href="/posts"> |
| <div class="icon flink-blog-icon"></div> |
| <span>Flink blog</span> |
| </a> |
| </div> |
| <div> |
| <a href="https://github.com/apache/flink"> |
| <div class="icon flink-github-icon"></div> |
| <span>Github</span> |
| </a> |
| </div> |
| <div> |
| <a href="https://twitter.com/apacheflink"> |
| <div class="icon flink-twitter-icon"></div> |
| <span>Twitter</span> |
| </a> |
| </div> |
| </div> |
| </div> |
| </div> |
| |
| <hr/> |
| |
| <div class="container disclaimer"> |
| <p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p> |
| </div> |
| |
| |
| |
| </footer> |
| |
| </body> |
| </html> |
| |
| |
| |
| |
| |
| |