blob: 28c7e59adc6a86686c9127bbcaf917011bcd087d [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2017/03/29/from-streams-to-tables-and-back-again-an-update-on-flinks-table-sql-api/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="Stream processing can deliver a lot of value. Many organizations have recognized the benefit of managing large volumes of data in real-time, reacting quickly to trends, and providing customers with live services at scale. Streaming applications with well-defined business logic can deliver a competitive advantage.
Flink&rsquo;s DataStream abstraction is a powerful API which lets you flexibly define both basic and complex streaming pipelines. Additionally, it offers low-level operations such as Async IO and ProcessFunctions.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="From Streams to Tables and Back Again: An Update on Flink&#39;s Table &amp; SQL API" />
<meta property="og:description" content="Stream processing can deliver a lot of value. Many organizations have recognized the benefit of managing large volumes of data in real-time, reacting quickly to trends, and providing customers with live services at scale. Streaming applications with well-defined business logic can deliver a competitive advantage.
Flink&rsquo;s DataStream abstraction is a powerful API which lets you flexibly define both basic and complex streaming pipelines. Additionally, it offers low-level operations such as Async IO and ProcessFunctions." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2017/03/29/from-streams-to-tables-and-back-again-an-update-on-flinks-table-sql-api/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2017-03-29T12:00:00+00:00" />
<meta property="article:modified_time" content="2017-03-29T12:00:00+00:00" />
<title>From Streams to Tables and Back Again: An Update on Flink&#39;s Table & SQL API | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2017/03/29/from-streams-to-tables-and-back-again-an-update-on-flinks-table-sql-api/">From Streams to Tables and Back Again: An Update on Flink&#39;s Table &amp; SQL API</a>
</h1>
March 29, 2017 -
<p><p>Stream processing can deliver a lot of value. Many organizations have recognized the benefit of managing large volumes of data in real-time, reacting quickly to trends, and providing customers with live services at scale. Streaming applications with well-defined business logic can deliver a competitive advantage.</p>
<p>Flink&rsquo;s <a href="//nightlies.apache.org/flink/flink-docs-release-1.2/dev/datastream_api.html">DataStream</a> abstraction is a powerful API which lets you flexibly define both basic and complex streaming pipelines. Additionally, it offers low-level operations such as <a href="//nightlies.apache.org/flink/flink-docs-release-1.2/dev/stream/asyncio.html">Async IO</a> and <a href="//nightlies.apache.org/flink/flink-docs-release-1.2/dev/stream/process_function.html">ProcessFunctions</a>. However, many users do not need such a deep level of flexibility. They need an API which quickly solves 80% of their use cases where simple tasks can be defined using little code.</p>
<p>To deliver the power of stream processing to a broader set of users, the Apache Flink community is developing APIs that provide simpler abstractions and more concise syntax so that users can focus on their business logic instead of advanced streaming concepts. Along with other APIs (such as <a href="//nightlies.apache.org/flink/flink-docs-release-1.2/dev/libs/cep.html">CEP</a> for complex event processing on streams), Flink offers a relational API that aims to unify stream and batch processing: the <a href="//nightlies.apache.org/flink/flink-docs-release-1.2/dev/table_api.html">Table &amp; SQL API</a>, often referred to as the Table API.</p>
<p>Recently, contributors working for companies such as Alibaba, Huawei, data Artisans, and more decided to further develop the Table API. Over the past year, the Table API has been rewritten entirely. Since Flink 1.1, its core has been based on <a href="http://calcite.apache.org/">Apache Calcite</a>, which parses SQL and optimizes all relational queries. Today, the Table API can address a wide range of use cases in both batch and stream environments with unified semantics.</p>
<p>This blog post summarizes the current status of Flink’s Table API and showcases some of the recently-added features in Apache Flink. Among the features presented here are the unified access to batch and streaming data, data transformation, and window operators.
The following paragraphs are not only supposed to give you a general overview of the Table API, but also to illustrate the potential of relational APIs in the future.</p>
<p>Because the Table API is built on top of Flink’s core APIs, <a href="//nightlies.apache.org/flink/flink-docs-release-1.2/dev/datastream_api.html">DataStreams</a> and <a href="//nightlies.apache.org/flink/flink-docs-release-1.2/dev/batch/index.html">DataSets</a> can be converted to a Table and vice-versa without much overhead. Hereafter, we show how to create tables from different sources and specify programs that can be executed locally or in a distributed setting. In this post, we will use the Scala version of the Table API, but there is also a Java version as well as a SQL API with an equivalent set of features.</p>
<h2 id="data-transformation-and-etl">
Data Transformation and ETL
<a class="anchor" href="#data-transformation-and-etl">#</a>
</h2>
<p>A common task in every data processing pipeline is importing data from one or multiple systems, applying some transformations to it, then exporting the data to another system. The Table API can help to manage these recurring tasks. For reading data, the API provides a set of ready-to-use <code>TableSources</code> such as a <code>CsvTableSource</code> and <code>KafkaTableSource</code>, however, it also allows the implementation of custom <code>TableSources</code> that can hide configuration specifics (e.g. watermark generation) from users who are less familiar with streaming concepts.</p>
<p>Let’s assume we have a CSV file that stores customer information. The values are delimited by a “|”-character and contain a customer identifier, name, timestamp of the last update, and preferences encoded in a comma-separated key-value string:</p>
<pre><code>42|Bob Smith|2016-07-23 16:10:11|color=12,length=200,size=200
</code></pre>
<p>The following example illustrates how to read a CSV file and perform some data cleansing before converting it to a regular DataStream program.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="c1">// set up execution environment
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">val</span> <span class="n">env</span> <span class="k">=</span> <span class="nc">StreamExecutionEnvironment</span><span class="o">.</span><span class="n">getExecutionEnvironment</span>
</span></span><span class="line"><span class="cl"><span class="k">val</span> <span class="n">tEnv</span> <span class="k">=</span> <span class="nc">TableEnvironment</span><span class="o">.</span><span class="n">getTableEnvironment</span><span class="o">(</span><span class="n">env</span><span class="o">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1">// configure table source
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">val</span> <span class="n">customerSource</span> <span class="k">=</span> <span class="nc">CsvTableSource</span><span class="o">.</span><span class="n">builder</span><span class="o">()</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">path</span><span class="o">(</span><span class="s">&#34;/path/to/customer_data.csv&#34;</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">ignoreFirstLine</span><span class="o">()</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">fieldDelimiter</span><span class="o">(</span><span class="s">&#34;|&#34;</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">field</span><span class="o">(</span><span class="s">&#34;id&#34;</span><span class="o">,</span> <span class="nc">Types</span><span class="o">.</span><span class="nc">LONG</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">field</span><span class="o">(</span><span class="s">&#34;name&#34;</span><span class="o">,</span> <span class="nc">Types</span><span class="o">.</span><span class="nc">STRING</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">field</span><span class="o">(</span><span class="s">&#34;last_update&#34;</span><span class="o">,</span> <span class="nc">Types</span><span class="o">.</span><span class="nc">TIMESTAMP</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">field</span><span class="o">(</span><span class="s">&#34;prefs&#34;</span><span class="o">,</span> <span class="nc">Types</span><span class="o">.</span><span class="nc">STRING</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">build</span><span class="o">()</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1">// name your table source
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="n">tEnv</span><span class="o">.</span><span class="n">registerTableSource</span><span class="o">(</span><span class="s">&#34;customers&#34;</span><span class="o">,</span> <span class="n">customerSource</span><span class="o">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1">// define your table program
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">val</span> <span class="n">table</span> <span class="k">=</span> <span class="n">tEnv</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">scan</span><span class="o">(</span><span class="s">&#34;customers&#34;</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">filter</span><span class="o">(</span>&#39;name<span class="o">.</span><span class="n">isNotNull</span> <span class="o">&amp;&amp;</span> &#39;last_update <span class="o">&gt;</span> <span class="s">&#34;2016-01-01 00:00:00&#34;</span><span class="o">.</span><span class="n">toTimestamp</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">select</span><span class="o">(</span>&#39;id<span class="o">,</span> &#39;name<span class="o">.</span><span class="n">lowerCase</span><span class="o">(),</span> &#39;prefs<span class="o">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1">// convert it to a data stream
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">val</span> <span class="n">ds</span> <span class="k">=</span> <span class="n">table</span><span class="o">.</span><span class="n">toDataStream</span><span class="o">[</span><span class="kt">Row</span><span class="o">]</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="n">ds</span><span class="o">.</span><span class="n">print</span><span class="o">()</span>
</span></span><span class="line"><span class="cl"><span class="n">env</span><span class="o">.</span><span class="n">execute</span><span class="o">()</span>
</span></span></code></pre></div><p>The Table API comes with a large set of built-in functions that make it easy to specify business logic using a language integrated query (LINQ) syntax. In the example above, we filter out customers with invalid names and only select those that updated their preferences recently. We convert names to lowercase for normalization. For debugging purposes, we convert the table into a DataStream and print it.</p>
<p>The <code>CsvTableSource</code> supports both batch and stream environments. If the programmer wants to execute the program above in a batch application, all he or she has to do is to replace the environment via <code>ExecutionEnvironment</code> and change the output conversion from <code>DataStream</code> to <code>DataSet</code>. The Table API program itself doesn’t change.</p>
<p>In the example, we converted the table program to a data stream of <code>Row</code> objects. However, we are not limited to row data types. The Table API supports all types from the underlying APIs such as Java and Scala Tuples, Case Classes, POJOs, or generic types that are serialized using Kryo. Let’s assume that we want to have regular object (POJO) with the following format instead of generic rows:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="k">class</span> <span class="nc">Customer</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="k">var</span> <span class="n">id</span><span class="k">:</span> <span class="kt">Int</span> <span class="o">=</span> <span class="k">_</span>
</span></span><span class="line"><span class="cl"> <span class="k">var</span> <span class="n">name</span><span class="k">:</span> <span class="kt">String</span> <span class="o">=</span> <span class="k">_</span>
</span></span><span class="line"><span class="cl"> <span class="k">var</span> <span class="n">update</span><span class="k">:</span> <span class="kt">Long</span> <span class="o">=</span> <span class="k">_</span>
</span></span><span class="line"><span class="cl"> <span class="k">var</span> <span class="n">prefs</span><span class="k">:</span> <span class="kt">java.util.Properties</span> <span class="o">=</span> <span class="k">_</span>
</span></span><span class="line"><span class="cl"><span class="o">}</span>
</span></span></code></pre></div><p>We can use the following table program to convert the CSV file into Customer objects. Flink takes care of creating objects and mapping fields for us.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="k">val</span> <span class="n">ds</span> <span class="k">=</span> <span class="n">tEnv</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">scan</span><span class="o">(</span><span class="s">&#34;customers&#34;</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">select</span><span class="o">(</span>&#39;id<span class="o">,</span> &#39;name<span class="o">,</span> &#39;last_update <span class="n">as</span> &#39;update<span class="o">,</span> <span class="n">parseProperties</span><span class="o">(</span>&#39;prefs<span class="o">)</span> <span class="n">as</span> &#39;prefs<span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">toDataStream</span><span class="o">[</span><span class="kt">Customer</span><span class="o">]</span>
</span></span></code></pre></div><p>You might have noticed that the query above uses a function to parse the preferences field. Even though Flink’s Table API is shipped with a large set of built-in functions, is often necessary to define custom user-defined scalar functions. In the above example we use a user-defined function <code>parseProperties</code>. The following code snippet shows how easily we can implement a scalar function.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="k">object</span> <span class="nc">parseProperties</span> <span class="k">extends</span> <span class="nc">ScalarFunction</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="k">def</span> <span class="n">eval</span><span class="o">(</span><span class="n">str</span><span class="k">:</span> <span class="kt">String</span><span class="o">)</span><span class="k">:</span> <span class="kt">Properties</span> <span class="o">=</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="k">val</span> <span class="n">props</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">Properties</span><span class="o">()</span>
</span></span><span class="line"><span class="cl"> <span class="n">str</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&#34;,&#34;</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">map</span><span class="o">(</span><span class="nc">\_</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&#34;=&#34;</span><span class="o">))</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">foreach</span><span class="o">(</span><span class="n">split</span> <span class="k">=&gt;</span> <span class="n">props</span><span class="o">.</span><span class="n">setProperty</span><span class="o">(</span><span class="n">split</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span> <span class="n">split</span><span class="o">(</span><span class="mi">1</span><span class="o">)))</span>
</span></span><span class="line"><span class="cl"> <span class="n">props</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl"><span class="o">}</span>
</span></span></code></pre></div><p>Scalar functions can be used to deserialize, extract, or convert values (and more). By overwriting the <code>open()</code> method we can even have access to runtime information such as distributed cached files or metrics. Even the <code>open()</code> method is only called once during the runtime’s <a href="//nightlies.apache.org/flink/flink-docs-release-1.3/internals/task_lifecycle.html">task lifecycle</a>.</p>
<h2 id="unified-windowing-for-static-and-streaming-data">
Unified Windowing for Static and Streaming Data
<a class="anchor" href="#unified-windowing-for-static-and-streaming-data">#</a>
</h2>
<p>Another very common task, especially when working with continuous data, is the definition of windows to split a stream into pieces of finite size, over which we can apply computations. At the moment, the Table API supports three types of windows: sliding windows, tumbling windows, and session windows (for general definitions of the different types of windows, we recommend <a href="//nightlies.apache.org/flink/flink-docs-release-1.2/dev/windows.html">Flink’s documentation</a>). All three window types work on <a href="//nightlies.apache.org/flink/flink-docs-release-1.2/dev/event_time.html">event or processing time</a>. Session windows can be defined over time intervals, sliding and tumbling windows can be defined over time intervals or a number of rows.</p>
<p>Let’s assume that our customer data from the example above is an event stream of updates generated whenever the customer updated his or her preferences. We assume that events come from a TableSource that has assigned timestamps and watermarks. The definition of a window happens again in a LINQ-style fashion. The following example could be used to count the updates to the preferences during one day.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="n">table</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Tumble</span> <span class="n">over</span> <span class="mf">1.d</span><span class="n">ay</span> <span class="n">on</span> &#39;rowtime <span class="n">as</span> &#39;w<span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">groupBy</span><span class="o">(</span>&#39;id<span class="o">,</span> &#39;w<span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">select</span><span class="o">(</span>&#39;id<span class="o">,</span> &#39;w<span class="o">.</span><span class="n">start</span> <span class="n">as</span> &#39;from<span class="o">,</span> &#39;w<span class="o">.</span><span class="n">end</span> <span class="n">as</span> &#39;to<span class="o">,</span> &#39;prefs<span class="o">.</span><span class="n">count</span> <span class="n">as</span> &#39;updates<span class="o">)</span>
</span></span></code></pre></div><p>By using the <code>on()</code> parameter, we can specify whether the window is supposed to work on event-time or not. The Table API assumes that timestamps and watermarks are assigned correctly when using event-time. Elements with timestamps smaller than the last received watermark are dropped. Since the extraction of timestamps and generation of watermarks depends on the data source and requires some deeper knowledge of their origin, the TableSource or the upstream DataStream is usually responsible for assigning these properties.</p>
<p>The following code shows how to define other types of windows:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="c1">// using processing-time
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="n">table</span><span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Tumble</span> <span class="n">over</span> <span class="mf">100.</span><span class="n">rows</span> <span class="n">as</span> &#39;manyRowWindow<span class="o">)</span>
</span></span><span class="line"><span class="cl"><span class="c1">// using event-time
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="n">table</span><span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Session</span> <span class="n">withGap</span> <span class="mf">15.</span><span class="n">minutes</span> <span class="n">on</span> &#39;rowtime <span class="n">as</span> &#39;sessionWindow<span class="o">)</span>
</span></span><span class="line"><span class="cl"><span class="n">table</span><span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Slide</span> <span class="n">over</span> <span class="mf">1.d</span><span class="n">ay</span> <span class="n">every</span> <span class="mf">1.</span><span class="n">hour</span> <span class="n">on</span> &#39;rowtime <span class="n">as</span> &#39;dailyWindow<span class="o">)</span>
</span></span></code></pre></div><p>Since batch is just a special case of streaming (where a batch happens to have a defined start and end point), it is also possible to apply all of these windows in a batch execution environment. Without any modification of the table program itself, we can run the code on a DataSet given that we specified a column named “rowtime”. This is particularly interesting if we want to compute exact results from time-to-time, so that late events that are heavily out-of-order can be included in the computation.</p>
<p>At the moment, the Table API only supports so-called “group windows” that also exist in the DataStream API. Other windows such as SQL’s OVER clause windows are in development and <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A&#43;Table&#43;API&#43;Stream&#43;Aggregations">planned for Flink 1.3</a>.</p>
<p>In order to demonstrate the expressiveness and capabilities of the API, here’s a snippet with a more advanced example of an exponentially decaying moving average over a sliding window of one hour which returns aggregated results every second. The table program weighs recent orders more heavily than older orders. This example is borrowed from <a href="https://calcite.apache.org/docs/stream.html#hopping-windows">Apache Calcite</a> and shows what will be possible in future Flink releases for both the Table API and SQL.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="n">table</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">window</span><span class="o">(</span><span class="nc">Slide</span> <span class="n">over</span> <span class="mf">1.</span><span class="n">hour</span> <span class="n">every</span> <span class="mf">1.</span><span class="n">second</span> <span class="n">as</span> &#39;w<span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">groupBy</span><span class="o">(</span>&#39;productId<span class="o">,</span> &#39;w<span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">select</span><span class="o">(</span>
</span></span><span class="line"><span class="cl"> &#39;w<span class="o">.</span><span class="n">end</span><span class="o">,</span>
</span></span><span class="line"><span class="cl"> &#39;productId<span class="o">,</span>
</span></span><span class="line"><span class="cl"> <span class="o">(</span>&#39;unitPrice <span class="o">*</span> <span class="o">(</span>&#39;rowtime <span class="o">-</span> &#39;w<span class="o">.</span><span class="n">start</span><span class="o">).</span><span class="n">exp</span><span class="o">()</span> <span class="o">/</span> <span class="mf">1.</span><span class="n">hour</span><span class="o">).</span><span class="n">sum</span> <span class="o">/</span> <span class="o">((</span>&#39;rowtime <span class="o">-</span> &#39;w<span class="o">.</span><span class="n">start</span><span class="o">).</span><span class="n">exp</span><span class="o">()</span> <span class="o">/</span> <span class="mf">1.</span><span class="n">hour</span><span class="o">).</span><span class="n">sum</span><span class="o">)</span>
</span></span></code></pre></div><h2 id="user-defined-table-functions">
User-defined Table Functions
<a class="anchor" href="#user-defined-table-functions">#</a>
</h2>
<p><a href="//nightlies.apache.org/flink/flink-docs-release-1.2/dev/table_api.html#user-defined-table-functions">User-defined table functions</a> were added in Flink 1.2. These can be quite useful for table columns containing non-atomic values which need to be extracted and mapped to separate fields before processing. Table functions take an arbitrary number of scalar values and allow for returning an arbitrary number of rows as output instead of a single value, similar to a flatMap function in the DataStream or DataSet API. The output of a table function can then be joined with the original row in the table by using either a left-outer join or cross join.</p>
<p>Using the previously-mentioned customer table, let’s assume we want to produce a table that contains the color and size preferences as separate columns. The table program would look like this:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="c1">// create an instance of the table function
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">val</span> <span class="n">extractPrefs</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">PropertiesExtractor</span><span class="o">()</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1">// derive rows and join them with original row
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="n">table</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">join</span><span class="o">(</span><span class="n">extractPrefs</span><span class="o">(</span>&#39;prefs<span class="o">)</span> <span class="n">as</span> <span class="o">(</span>&#39;color<span class="o">,</span> &#39;size<span class="o">))</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">select</span><span class="o">(</span>&#39;id<span class="o">,</span> &#39;username<span class="o">,</span> &#39;color<span class="o">,</span> &#39;size<span class="o">)</span>
</span></span></code></pre></div><p>The <code>PropertiesExtractor</code> is a user-defined table function that extracts the color and size. We are not interested in customers that haven’t set these preferences and thus don’t emit anything if both properties are not present in the string value. Since we are using a (cross) join in the program, customers without a result on the right side of the join will be filtered out.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-scala" data-lang="scala"><span class="line"><span class="cl"><span class="k">class</span> <span class="nc">PropertiesExtractor</span> <span class="k">extends</span> <span class="nc">TableFunction</span><span class="o">[</span><span class="kt">Row</span><span class="o">]</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="k">def</span> <span class="n">eval</span><span class="o">(</span><span class="n">prefs</span><span class="k">:</span> <span class="kt">String</span><span class="o">)</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="c1">// split string into (key, value) pairs
</span></span></span><span class="line"><span class="cl"><span class="c1"></span> <span class="k">val</span> <span class="n">pairs</span> <span class="k">=</span> <span class="n">prefs</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&#34;,&#34;</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">map</span> <span class="o">{</span> <span class="n">kv</span> <span class="k">=&gt;</span>
</span></span><span class="line"><span class="cl"> <span class="k">val</span> <span class="n">split</span> <span class="k">=</span> <span class="n">kv</span><span class="o">.</span><span class="n">split</span><span class="o">(</span><span class="s">&#34;=&#34;</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="o">(</span><span class="n">split</span><span class="o">(</span><span class="mi">0</span><span class="o">),</span> <span class="n">split</span><span class="o">(</span><span class="mi">1</span><span class="o">))</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">val</span> <span class="n">color</span> <span class="k">=</span> <span class="n">pairs</span><span class="o">.</span><span class="n">find</span><span class="o">(</span><span class="nc">\_</span><span class="o">.</span><span class="nc">\_1</span> <span class="o">==</span> <span class="s">&#34;color&#34;</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="nc">\_</span><span class="o">.</span><span class="nc">\_2</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="k">val</span> <span class="n">size</span> <span class="k">=</span> <span class="n">pairs</span><span class="o">.</span><span class="n">find</span><span class="o">(</span><span class="nc">\_</span><span class="o">.</span><span class="nc">\_1</span> <span class="o">==</span> <span class="s">&#34;size&#34;</span><span class="o">).</span><span class="n">map</span><span class="o">(</span><span class="nc">\_</span><span class="o">.</span><span class="nc">\_2</span><span class="o">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1">// emit a row if color and size are specified
</span></span></span><span class="line"><span class="cl"><span class="c1"></span> <span class="o">(</span><span class="n">color</span><span class="o">,</span> <span class="n">size</span><span class="o">)</span> <span class="k">match</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="k">case</span> <span class="o">(</span><span class="nc">Some</span><span class="o">(</span><span class="n">c</span><span class="o">),</span> <span class="nc">Some</span><span class="o">(</span><span class="n">s</span><span class="o">))</span> <span class="k">=&gt;</span> <span class="n">collect</span><span class="o">(</span><span class="nc">Row</span><span class="o">.</span><span class="n">of</span><span class="o">(</span><span class="n">c</span><span class="o">,</span> <span class="n">s</span><span class="o">))</span>
</span></span><span class="line"><span class="cl"> <span class="k">case</span> <span class="k">_</span> <span class="k">=&gt;</span> <span class="c1">// skip
</span></span></span><span class="line"><span class="cl"><span class="c1"></span> <span class="o">}</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">override</span> <span class="k">def</span> <span class="n">getResultType</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">RowTypeInfo</span><span class="o">(</span><span class="nc">Types</span><span class="o">.</span><span class="nc">STRING</span><span class="o">,</span> <span class="nc">Types</span><span class="o">.</span><span class="nc">STRING</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"><span class="o">}</span>
</span></span></code></pre></div><h2 id="conclusion">
Conclusion
<a class="anchor" href="#conclusion">#</a>
</h2>
<p>There is significant interest in making streaming more accessible and easier to use. Flink’s Table API development is happening quickly, and we believe that soon, you will be able to implement large batch or streaming pipelines using purely relational APIs or even convert existing Flink jobs to table programs. The Table API is already a very useful tool since you can work around limitations and missing features at any time by switching back-and-forth between the DataSet/DataStream abstraction to the Table abstraction.</p>
<p>Contributions like support of Apache Hive UDFs, external catalogs, more TableSources, additional windows, and more operators will make the Table API an even more useful tool. Particularly, the upcoming introduction of Dynamic Tables, which is worth a blog post of its own, shows that even in 2017, new relational APIs open the door to a number of possibilities.</p>
<p>Try it out, or even better, join the design discussions on the <a href="http://flink.apache.org/community.html#mailing-lists">mailing lists</a> and <a href="https://issues.apache.org/jira/browse/FLINK/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel">JIRA</a> and start contributing!</p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2017-03-29-table-sql-api-update.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li>
<ul>
<li><a href="#data-transformation-and-etl">Data Transformation and ETL</a></li>
<li><a href="#unified-windowing-for-static-and-streaming-data">Unified Windowing for Static and Streaming Data</a></li>
<li><a href="#user-defined-table-functions">User-defined Table Functions</a></li>
<li><a href="#conclusion">Conclusion</a></li>
</ul>
</li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="/what-is-flink/security">Security</a-->
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>