blob: 85d0f658408105f8e8aeb2d3a9f150dcfbef5c8e [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2020/07/28/flink-sql-demo-building-an-end-to-end-streaming-application/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="Apache Flink 1.11 has released many exciting new features, including many developments in Flink SQL which is evolving at a fast pace. This article takes a closer look at how to quickly build streaming applications with Flink SQL from a practical point of view.
In the following sections, we describe how to integrate Kafka, MySQL, Elasticsearch, and Kibana with Flink SQL to analyze e-commerce user behavior in real-time. All exercises in this blogpost are performed in the Flink SQL CLI, and the entire process uses standard SQL syntax, without a single line of Java/Scala code or IDE installation.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Flink SQL Demo: Building an End-to-End Streaming Application" />
<meta property="og:description" content="Apache Flink 1.11 has released many exciting new features, including many developments in Flink SQL which is evolving at a fast pace. This article takes a closer look at how to quickly build streaming applications with Flink SQL from a practical point of view.
In the following sections, we describe how to integrate Kafka, MySQL, Elasticsearch, and Kibana with Flink SQL to analyze e-commerce user behavior in real-time. All exercises in this blogpost are performed in the Flink SQL CLI, and the entire process uses standard SQL syntax, without a single line of Java/Scala code or IDE installation." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2020/07/28/flink-sql-demo-building-an-end-to-end-streaming-application/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2020-07-28T12:00:00+00:00" />
<meta property="article:modified_time" content="2020-07-28T12:00:00+00:00" />
<title>Flink SQL Demo: Building an End-to-End Streaming Application | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.baf635ab0e127f80152dd1da4b524a5dea67cb9cc0feb21710b5188ada9c15c1.js" integrity="sha256-uvY1qw4Sf4AVLdHaS1JKXepny5zA/rIXELUYitqcFcE="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2020/07/28/flink-sql-demo-building-an-end-to-end-streaming-application/">Flink SQL Demo: Building an End-to-End Streaming Application</a>
</h1>
July 28, 2020 -
Jark Wu
<a href="https://twitter.com/JarkWu">(@JarkWu)</a>
<p><p>Apache Flink 1.11 has released many exciting new features, including many developments in Flink SQL which is evolving at a fast pace. This article takes a closer look at how to quickly build streaming applications with Flink SQL from a practical point of view.</p>
<p>In the following sections, we describe how to integrate Kafka, MySQL, Elasticsearch, and Kibana with Flink SQL to analyze e-commerce user behavior in real-time. All exercises in this blogpost are performed in the Flink SQL CLI, and the entire process uses standard SQL syntax, without a single line of Java/Scala code or IDE installation. The final result of this demo is shown in the following figure:</p>
<center>
<img src="/img/blog/2020-07-28-flink-sql-demo/image1.gif" width="650px" alt="Demo Overview"/>
</center>
<br>
<h1 id="preparation">
Preparation
<a class="anchor" href="#preparation">#</a>
</h1>
<p>Prepare a Linux or MacOS computer with Docker installed.</p>
<h2 id="starting-the-demo-environment">
Starting the Demo Environment
<a class="anchor" href="#starting-the-demo-environment">#</a>
</h2>
<p>The components required in this demo are all managed in containers, so we will use <code>docker-compose</code> to start them. First, download the <code>docker-compose.yml</code> file that defines the demo environment, for example by running the following commands:</p>
<pre tabindex="0"><code>mkdir flink-sql-demo; cd flink-sql-demo;
wget https://raw.githubusercontent.com/wuchong/flink-sql-demo/v1.11-EN/docker-compose.yml
</code></pre><p>The Docker Compose environment consists of the following containers:</p>
<ul>
<li><strong>Flink SQL CLI:</strong> used to submit queries and visualize their results.</li>
<li><strong>Flink Cluster:</strong> a Flink JobManager and a Flink TaskManager container to execute queries.</li>
<li><strong>MySQL:</strong> MySQL 5.7 and a pre-populated <code>category</code> table in the database. The <code>category</code> table will be joined with data in Kafka to enrich the real-time data.</li>
<li><strong>Kafka:</strong> mainly used as a data source. The DataGen component automatically writes data into a Kafka topic.</li>
<li><strong>Zookeeper:</strong> this component is required by Kafka.</li>
<li><strong>Elasticsearch:</strong> mainly used as a data sink.</li>
<li><strong>Kibana:</strong> used to visualize the data in Elasticsearch.</li>
<li><strong>DataGen:</strong> the data generator. After the container is started, user behavior data is automatically generated and sent to the Kafka topic. By default, 2000 data entries are generated each second for about 1.5 hours. You can modify DataGen&rsquo;s <code>speedup</code> parameter in <code>docker-compose.yml</code> to adjust the generation rate (which takes effect after Docker Compose is restarted).</li>
</ul>
<div class="alert alert-danger" markdown="1">
<span class="label label-danger" style="display: inline-block"> Note </span>
Before starting the containers, we recommend configuring Docker so that sufficient resources are available and the environment does not become unresponsive. We suggest running Docker at 3-4 GB memory and 3-4 CPU cores.
</div>
<p>To start all containers, run the following command in the directory that contains the <code>docker-compose.yml</code> file.</p>
<pre tabindex="0"><code>docker-compose up -d
</code></pre><p>This command automatically starts all the containers defined in the Docker Compose configuration in a detached mode. Run <code>docker ps</code> to check whether the 9 containers are running properly. You can also visit <a href="http://localhost:5601/">http://localhost:5601/</a> to see if Kibana is running normally.</p>
<p>Don’t forget to run the following command to stop all containers after you finished the tutorial:</p>
<pre tabindex="0"><code>docker-compose down
</code></pre><h2 id="entering-the-flink-sql-cli-client">
Entering the Flink SQL CLI client
<a class="anchor" href="#entering-the-flink-sql-cli-client">#</a>
</h2>
<p>To enter the SQL CLI client run:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-bash" data-lang="bash"><span class="line"><span class="cl">docker-compose <span class="nb">exec</span> sql-client ./sql-client.sh
</span></span></code></pre></div><p>The command starts the SQL CLI client in the container.
You should see the welcome screen of the CLI client.</p>
<center>
<img src="/img/blog/2020-07-28-flink-sql-demo/image3.png" width="500px" alt="Flink SQL CLI welcome page"/>
</center>
<br>
<h2 id="creating-a-kafka-table-using-ddl">
Creating a Kafka table using DDL
<a class="anchor" href="#creating-a-kafka-table-using-ddl">#</a>
</h2>
<p>The DataGen container continuously writes events into the Kafka <code>user_behavior</code> topic. This data contains the user behavior on the day of November 27, 2017 (behaviors include “click”, “like”, “purchase” and “add to shopping cart” events). Each row represents a user behavior event, with the user ID, product ID, product category ID, event type, and timestamp in JSON format. Note that the dataset is from the <a href="https://tianchi.aliyun.com/dataset/dataDetail?dataId=649">Alibaba Cloud Tianchi public dataset</a>.</p>
<p>In the directory that contains <code>docker-compose.yml</code>, run the following command to view the first 10 data entries generated in the Kafka topic:</p>
<pre tabindex="0"><code>docker-compose exec kafka bash -c &#39;kafka-console-consumer.sh --topic user_behavior --bootstrap-server kafka:9094 --from-beginning --max-messages 10&#39;
{&#34;user_id&#34;: &#34;952483&#34;, &#34;item_id&#34;:&#34;310884&#34;, &#34;category_id&#34;: &#34;4580532&#34;, &#34;behavior&#34;: &#34;pv&#34;, &#34;ts&#34;: &#34;2017-11-27T00:00:00Z&#34;}
{&#34;user_id&#34;: &#34;794777&#34;, &#34;item_id&#34;:&#34;5119439&#34;, &#34;category_id&#34;: &#34;982926&#34;, &#34;behavior&#34;: &#34;pv&#34;, &#34;ts&#34;: &#34;2017-11-27T00:00:00Z&#34;}
...
</code></pre><p>In order to make the events in the Kafka topic accessible to Flink SQL, we run the following DDL statement in SQL CLI to create a table that connects to the topic in the Kafka cluster:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">user_behavior</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">user_id</span><span class="w"> </span><span class="nb">BIGINT</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">item_id</span><span class="w"> </span><span class="nb">BIGINT</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">category_id</span><span class="w"> </span><span class="nb">BIGINT</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">behavior</span><span class="w"> </span><span class="n">STRING</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">ts</span><span class="w"> </span><span class="k">TIMESTAMP</span><span class="p">(</span><span class="mi">3</span><span class="p">),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">proctime</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="n">PROCTIME</span><span class="p">(),</span><span class="w"> </span><span class="c1">-- generates processing-time attribute using computed column
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="w"> </span><span class="n">WATERMARK</span><span class="w"> </span><span class="k">FOR</span><span class="w"> </span><span class="n">ts</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="n">ts</span><span class="w"> </span><span class="o">-</span><span class="w"> </span><span class="nb">INTERVAL</span><span class="w"> </span><span class="s1">&#39;5&#39;</span><span class="w"> </span><span class="k">SECOND</span><span class="w"> </span><span class="c1">-- defines watermark on ts column, marks ts as event-time attribute
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="p">)</span><span class="w"> </span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;connector&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;kafka&#39;</span><span class="p">,</span><span class="w"> </span><span class="c1">-- using kafka connector
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="w"> </span><span class="s1">&#39;topic&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;user_behavior&#39;</span><span class="p">,</span><span class="w"> </span><span class="c1">-- kafka topic
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="w"> </span><span class="s1">&#39;scan.startup.mode&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;earliest-offset&#39;</span><span class="p">,</span><span class="w"> </span><span class="c1">-- reading from the beginning
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="w"> </span><span class="s1">&#39;properties.bootstrap.servers&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;kafka:9094&#39;</span><span class="p">,</span><span class="w"> </span><span class="c1">-- kafka broker address
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="w"> </span><span class="s1">&#39;format&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;json&#39;</span><span class="w"> </span><span class="c1">-- the data format is json
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>The above snippet declares five fields based on the data format. In addition, it uses the computed column syntax and built-in <code>PROCTIME()</code> function to declare a virtual column that generates the processing-time attribute. It also uses the <code>WATERMARK</code> syntax to declare the watermark strategy on the <code>ts</code> field (tolerate 5-seconds out-of-order). Therefore, the <code>ts</code> field becomes an event-time attribute. For more information about time attributes and DDL syntax, see the following official documents:</p>
<ul>
<li><a href="//nightlies.apache.org/flink/flink-docs-release-1.11/dev/table/streaming/time_attributes.html">Time attributes in Flink’s Table API &amp; SQL</a></li>
<li><a href="//nightlies.apache.org/flink/flink-docs-release-1.11/dev/table/sql/create.html#create-table">DDL Syntax in Flink SQL</a></li>
</ul>
<p>After creating the <code>user_behavior</code> table in the SQL CLI, run <code>SHOW TABLES;</code> and <code>DESCRIBE user_behavior;</code> to see registered tables and table details. Also, run the command <code>SELECT * FROM user_behavior;</code> directly in the SQL CLI to preview the data (press <code>q</code> to exit).</p>
<p>Next, we discover more about Flink SQL through three real-world scenarios.</p>
<h1 id="hourly-trading-volume">
Hourly Trading Volume
<a class="anchor" href="#hourly-trading-volume">#</a>
</h1>
<h2 id="creating-an-elasticsearch-table-using-ddl">
Creating an Elasticsearch table using DDL
<a class="anchor" href="#creating-an-elasticsearch-table-using-ddl">#</a>
</h2>
<p>Let’s create an Elasticsearch result table in the SQL CLI. We need two columns in this case: <code>hour_of_day</code> and <code>buy_cnt</code> (trading volume).</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">buy_cnt_per_hour</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">hour_of_day</span><span class="w"> </span><span class="nb">BIGINT</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">buy_cnt</span><span class="w"> </span><span class="nb">BIGINT</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="w"> </span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;connector&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;elasticsearch-7&#39;</span><span class="p">,</span><span class="w"> </span><span class="c1">-- using elasticsearch connector
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="w"> </span><span class="s1">&#39;hosts&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;http://elasticsearch:9200&#39;</span><span class="p">,</span><span class="w"> </span><span class="c1">-- elasticsearch address
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="w"> </span><span class="s1">&#39;index&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;buy_cnt_per_hour&#39;</span><span class="w"> </span><span class="c1">-- elasticsearch index name, similar to database table name
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>There is no need to create the <code>buy_cnt_per_hour</code> index in Elasticsearch in advance since Elasticsearch will automatically create the index if it does not exist.</p>
<h2 id="submitting-a-query">
Submitting a Query
<a class="anchor" href="#submitting-a-query">#</a>
</h2>
<p>The hourly trading volume is the number of &ldquo;buy&rdquo; behaviors completed each hour. Therefore, we can use a <code>TUMBLE</code> window function to assign data into hourly windows. Then, we count the number of “buy” records in each window. To implement this, we can filter out the &ldquo;buy&rdquo; data first and then apply <code>COUNT(*)</code>.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">INSERT</span><span class="w"> </span><span class="k">INTO</span><span class="w"> </span><span class="n">buy_cnt_per_hour</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">SELECT</span><span class="w"> </span><span class="n">HOUR</span><span class="p">(</span><span class="n">TUMBLE_START</span><span class="p">(</span><span class="n">ts</span><span class="p">,</span><span class="w"> </span><span class="nb">INTERVAL</span><span class="w"> </span><span class="s1">&#39;1&#39;</span><span class="w"> </span><span class="n">HOUR</span><span class="p">)),</span><span class="w"> </span><span class="k">COUNT</span><span class="p">(</span><span class="o">*</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">FROM</span><span class="w"> </span><span class="n">user_behavior</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">WHERE</span><span class="w"> </span><span class="n">behavior</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;buy&#39;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">GROUP</span><span class="w"> </span><span class="k">BY</span><span class="w"> </span><span class="n">TUMBLE</span><span class="p">(</span><span class="n">ts</span><span class="p">,</span><span class="w"> </span><span class="nb">INTERVAL</span><span class="w"> </span><span class="s1">&#39;1&#39;</span><span class="w"> </span><span class="n">HOUR</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>Here, we use the built-in <code>HOUR</code> function to extract the value for each hour in the day from a <code>TIMESTAMP</code> column. Use <code>INSERT INTO</code> to start a Flink SQL job that continuously writes results into the Elasticsearch <code>buy_cnt_per_hour</code> index. The Elasticearch result table can be seen as a materialized view of the query. You can find more information about Flink’s window aggregation in the <a href="//nightlies.apache.org/flink/flink-docs-release-1.11/dev/table/sql/queries.html#group-windows">Apache Flink documentation</a>.</p>
<p>After running the previous query in the Flink SQL CLI, we can observe the submitted task on the <a href="http://localhost:8081">Flink Web UI</a>. This task is a streaming task and therefore runs continuously.</p>
<center>
<img src="/img/blog/2020-07-28-flink-sql-demo/image4.jpg" width="800px" alt="Flink Dashboard"/>
</center>
<br>
<h2 id="using-kibana-to-visualize-results">
Using Kibana to Visualize Results
<a class="anchor" href="#using-kibana-to-visualize-results">#</a>
</h2>
<p>Access Kibana at <a href="http://localhost:5601">http://localhost:5601</a>. First, configure an index pattern by clicking &ldquo;Management&rdquo; in the left-side toolbar and find &ldquo;Index Patterns&rdquo;. Next, click &ldquo;Create Index Pattern&rdquo; and enter the full index name <code>buy_cnt_per_hour</code> to create the index pattern. After creating the index pattern, we can explore data in Kibana.</p>
<div class="alert alert-info" markdown="1">
<span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Note </span>
Since we are using the TUMBLE window of one hour here, it might take about four minutes between the time that containers started and until the first row is emitted. Until then the index does not exist and Kibana is unable to find the index.
</div>
<p>Click &ldquo;Discover&rdquo; in the left-side toolbar. Kibana lists the content of the created index.</p>
<center>
<img src="/img/blog/2020-07-28-flink-sql-demo/image5.jpg" width="800px" alt="Kibana Discover"/>
</center>
<br>
<p>Next, create a dashboard to display various views. Click &ldquo;Dashboard&rdquo; on the left side of the page to create a dashboard named &ldquo;User Behavior Analysis&rdquo;. Then, click &ldquo;Create New&rdquo; to create a new view. Select &ldquo;Area&rdquo; (area graph), then select the <code>buy_cnt_per_hour</code> index, and draw the trading volume area chart as illustrated in the configuration on the left side of the following diagram. Apply the changes by clicking the “▶” play button. Then, save it as &ldquo;Hourly Trading Volume&rdquo;.</p>
<center>
<img src="/img/blog/2020-07-28-flink-sql-demo/image6.jpg" width="800px" alt="Hourly Trading Volume"/>
</center>
<br>
<p>You can see that during the early morning hours the number of transactions have the lowest value for the entire day.</p>
<p>As real-time data is added into the indices, you can enable auto-refresh in Kibana to see real-time visualization changes and updates. You can do so by clicking the time picker and entering a refresh interval (e.g. 3 seconds) in the “Refresh every” field.</p>
<h1 id="cumulative-number-of-unique-visitors-every-10-min">
Cumulative number of Unique Visitors every 10-min
<a class="anchor" href="#cumulative-number-of-unique-visitors-every-10-min">#</a>
</h1>
<p>Another interesting visualization is the cumulative number of unique visitors (UV). For example, the number of UV at 10:00 represents the total number of UV from 00:00 to 10:00. Therefore, the curve is monotonically increasing.</p>
<p>Let’s create another Elasticsearch table in the SQL CLI to store the UV results. This table contains 3 columns: date, time and cumulative UVs.
The <code>date_str</code> and <code>time_str</code> column are defined as primary key, Elasticsearch sink will use them to calculate the document ID and work in upsert mode to update UV values under the document ID.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">cumulative_uv</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">date_str</span><span class="w"> </span><span class="n">STRING</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">time_str</span><span class="w"> </span><span class="n">STRING</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">uv</span><span class="w"> </span><span class="nb">BIGINT</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">PRIMARY</span><span class="w"> </span><span class="k">KEY</span><span class="w"> </span><span class="p">(</span><span class="n">date_str</span><span class="p">,</span><span class="w"> </span><span class="n">time_str</span><span class="p">)</span><span class="w"> </span><span class="k">NOT</span><span class="w"> </span><span class="n">ENFORCED</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="w"> </span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;connector&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;elasticsearch-7&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;hosts&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;http://elasticsearch:9200&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;index&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;cumulative_uv&#39;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>We can extract the date and time using <code>DATE_FORMAT</code> function based on the <code>ts</code> field. As the section title describes, we only need to report every 10 minutes. So, we can use <code>SUBSTR</code> and the string concat function <code>||</code> to convert the time value into a 10-minute interval time string, such as <code>12:00</code>, <code>12:10</code>.
Next, we group data by <code>date_str</code> and perform a <code>COUNT DISTINCT</code> aggregation on <code>user_id</code> to get the current cumulative UV in this day. Additionally, we perform a <code>MAX</code> aggregation on <code>time_str</code> field to get the current stream time: the maximum event time observed so far.
As the maximum time is also a part of the primary key of the sink, the final result is that we will insert a new point into the elasticsearch every 10 minute. And every latest point will be updated continuously until the next 10-minute point is generated.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">INSERT</span><span class="w"> </span><span class="k">INTO</span><span class="w"> </span><span class="n">cumulative_uv</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">SELECT</span><span class="w"> </span><span class="n">date_str</span><span class="p">,</span><span class="w"> </span><span class="k">MAX</span><span class="p">(</span><span class="n">time_str</span><span class="p">),</span><span class="w"> </span><span class="k">COUNT</span><span class="p">(</span><span class="k">DISTINCT</span><span class="w"> </span><span class="n">user_id</span><span class="p">)</span><span class="w"> </span><span class="k">as</span><span class="w"> </span><span class="n">uv</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">FROM</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">SELECT</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">DATE_FORMAT</span><span class="p">(</span><span class="n">ts</span><span class="p">,</span><span class="w"> </span><span class="s1">&#39;yyyy-MM-dd&#39;</span><span class="p">)</span><span class="w"> </span><span class="k">as</span><span class="w"> </span><span class="n">date_str</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">SUBSTR</span><span class="p">(</span><span class="n">DATE_FORMAT</span><span class="p">(</span><span class="n">ts</span><span class="p">,</span><span class="w"> </span><span class="s1">&#39;HH:mm&#39;</span><span class="p">),</span><span class="mi">1</span><span class="p">,</span><span class="mi">4</span><span class="p">)</span><span class="w"> </span><span class="o">||</span><span class="w"> </span><span class="s1">&#39;0&#39;</span><span class="w"> </span><span class="k">as</span><span class="w"> </span><span class="n">time_str</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">user_id</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">FROM</span><span class="w"> </span><span class="n">user_behavior</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">GROUP</span><span class="w"> </span><span class="k">BY</span><span class="w"> </span><span class="n">date_str</span><span class="p">;</span><span class="w">
</span></span></span></code></pre></div><p>After submitting this query, we create a <code>cumulative_uv</code> index pattern in Kibana. We then create a &ldquo;Line&rdquo; (line graph) on the dashboard, by selecting the <code>cumulative_uv</code> index, and drawing the cumulative UV curve according to the configuration on the left side of the following figure before finally saving the curve.</p>
<center>
<img src="/img/blog/2020-07-28-flink-sql-demo/image7.jpg" width="800px" alt="Cumulative Unique Visitors every 10-min"/>
</center>
<br>
<h1 id="top-categories">
Top Categories
<a class="anchor" href="#top-categories">#</a>
</h1>
<p>The last visualization represents the category rankings to inform us on the most popular categories in our e-commerce site. Since our data source offers events for more than 5,000 categories without providing any additional significance to our analytics, we would like to reduce it so that it only includes the top-level categories. We will use the data in our MySQL database by joining it as a dimension table with our Kafka events to map sub-categories to top-level categories.</p>
<p>Create a table in the SQL CLI to make the data in MySQL accessible to Flink SQL.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">category_dim</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">sub_category_id</span><span class="w"> </span><span class="nb">BIGINT</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">parent_category_name</span><span class="w"> </span><span class="n">STRING</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="w"> </span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;connector&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;jdbc&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;url&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;jdbc:mysql://mysql:3306/flink&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;table-name&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;category&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;username&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;root&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;password&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;123456&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;lookup.cache.max-rows&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;5000&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;lookup.cache.ttl&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;10min&#39;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>The underlying JDBC connector implements the <code>LookupTableSource</code> interface, so the created JDBC table <code>category_dim</code> can be used as a temporal table (i.e. lookup table) out-of-the-box in the data enrichment.</p>
<p>In addition, create an Elasticsearch table to store the category statistics.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">top_category</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">category_name</span><span class="w"> </span><span class="n">STRING</span><span class="w"> </span><span class="k">PRIMARY</span><span class="w"> </span><span class="k">KEY</span><span class="w"> </span><span class="k">NOT</span><span class="w"> </span><span class="n">ENFORCED</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">buy_cnt</span><span class="w"> </span><span class="nb">BIGINT</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="w"> </span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;connector&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;elasticsearch-7&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;hosts&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;http://elasticsearch:9200&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;index&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;top_category&#39;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>In order to enrich the category names, we use Flink SQL’s temporal table joins to join a dimension table. You can access more information about <a href="//nightlies.apache.org/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#join-with-a-temporal-table">temporal joins</a> in the Flink documentation.</p>
<p>Additionally, we use the <code>CREATE VIEW</code> syntax to register the query as a logical view, allowing us to easily reference this query in subsequent queries and simplify nested queries. Please note that creating a logical view does not trigger the execution of the job and the view results are not persisted. Therefore, this statement is lightweight and does not have additional overhead.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">CREATE</span><span class="w"> </span><span class="k">VIEW</span><span class="w"> </span><span class="n">rich_user_behavior</span><span class="w"> </span><span class="k">AS</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">SELECT</span><span class="w"> </span><span class="n">U</span><span class="p">.</span><span class="n">user_id</span><span class="p">,</span><span class="w"> </span><span class="n">U</span><span class="p">.</span><span class="n">item_id</span><span class="p">,</span><span class="w"> </span><span class="n">U</span><span class="p">.</span><span class="n">behavior</span><span class="p">,</span><span class="w"> </span><span class="k">C</span><span class="p">.</span><span class="n">parent_category_name</span><span class="w"> </span><span class="k">as</span><span class="w"> </span><span class="n">category_name</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">FROM</span><span class="w"> </span><span class="n">user_behavior</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="n">U</span><span class="w"> </span><span class="k">LEFT</span><span class="w"> </span><span class="k">JOIN</span><span class="w"> </span><span class="n">category_dim</span><span class="w"> </span><span class="k">FOR</span><span class="w"> </span><span class="n">SYSTEM_TIME</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="k">OF</span><span class="w"> </span><span class="n">U</span><span class="p">.</span><span class="n">proctime</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="k">C</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">ON</span><span class="w"> </span><span class="n">U</span><span class="p">.</span><span class="n">category_id</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">C</span><span class="p">.</span><span class="n">sub_category_id</span><span class="p">;</span><span class="w">
</span></span></span></code></pre></div><p>Finally, we group the dimensional table by category name to count the number of <code>buy</code> events and write the result to Elasticsearch’s <code>top_category</code> index.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">INSERT</span><span class="w"> </span><span class="k">INTO</span><span class="w"> </span><span class="n">top_category</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">SELECT</span><span class="w"> </span><span class="n">category_name</span><span class="p">,</span><span class="w"> </span><span class="k">COUNT</span><span class="p">(</span><span class="o">*</span><span class="p">)</span><span class="w"> </span><span class="n">buy_cnt</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">FROM</span><span class="w"> </span><span class="n">rich_user_behavior</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">WHERE</span><span class="w"> </span><span class="n">behavior</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;buy&#39;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">GROUP</span><span class="w"> </span><span class="k">BY</span><span class="w"> </span><span class="n">category_name</span><span class="p">;</span><span class="w">
</span></span></span></code></pre></div><p>After submitting the query, we create a <code>top_category</code> index pattern in Kibana. We then create a &ldquo;Horizontal Bar&rdquo; (bar graph) on the dashboard, by selecting the <code>top_category</code> index and drawing the category ranking according to the configuration on the left side of the following diagram before finally saving the list.</p>
<center>
<img src="/img/blog/2020-07-28-flink-sql-demo/image8.jpg" width="800px" alt="Top Categories"/>
</center>
<br>
<p>As illustrated in the diagram, the categories of clothing and shoes exceed by far other categories on the e-commerce website.</p>
<hr>
<p>We have now implemented three practical applications and created charts for them. We can now return to the dashboard page and drag-and-drop each view to give our dashboard a more formal and intuitive style, as illustrated in the beginning of the blogpost. Of course, Kibana also provides a rich set of graphics and visualization features, and the user_behavior logs contain a lot more interesting information to explore. Using Flink SQL, you can analyze data in more dimensions, while using Kibana allows you to display more views and observe real-time changes in its charts!</p>
<h1 id="summary">
Summary
<a class="anchor" href="#summary">#</a>
</h1>
<p>In the previous sections, we described how to use Flink SQL to integrate Kafka, MySQL, Elasticsearch, and Kibana to quickly build a real-time analytics application. The entire process can be completed using standard SQL syntax, without a line of Java or Scala code. We hope that this article provides some clear and practical examples of the convenience and power of Flink SQL, featuring an easy connection to various external systems, native support for event time and out-of-order handling, dimension table joins and a wide range of built-in functions. We hope you have fun following the examples in this blogpost!</p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2020-07-28-flink-sql-demo-building-e2e-streaming-application.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li><a href="#preparation">Preparation</a>
<ul>
<li><a href="#starting-the-demo-environment">Starting the Demo Environment</a></li>
<li><a href="#entering-the-flink-sql-cli-client">Entering the Flink SQL CLI client</a></li>
<li><a href="#creating-a-kafka-table-using-ddl">Creating a Kafka table using DDL</a></li>
</ul>
</li>
<li><a href="#hourly-trading-volume">Hourly Trading Volume</a>
<ul>
<li><a href="#creating-an-elasticsearch-table-using-ddl">Creating an Elasticsearch table using DDL</a></li>
<li><a href="#submitting-a-query">Submitting a Query</a></li>
<li><a href="#using-kibana-to-visualize-results">Using Kibana to Visualize Results</a></li>
</ul>
</li>
<li><a href="#cumulative-number-of-unique-visitors-every-10-min">Cumulative number of Unique Visitors every 10-min</a></li>
<li><a href="#top-categories">Top Categories</a></li>
<li><a href="#summary">Summary</a></li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="https://www.apache.org/security/">Security</a>
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>