blob: 41dfdb160b111e28f24f6b0edda157e882f8e54b [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2019/05/14/flux-capacitor-huh-temporal-tables-and-joins-in-streaming-sql/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="Figuring out how to manage and model temporal data for effective point-in-time analysis was a longstanding battle, dating as far back as the early 80’s, that culminated with the introduction of temporal tables in the SQL standard in 2011. Up to that point, users were doomed to implement this as part of the application logic, often hurting the length of the development lifecycle as well as the maintainability of the code.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Flux capacitor, huh? Temporal Tables and Joins in Streaming SQL" />
<meta property="og:description" content="Figuring out how to manage and model temporal data for effective point-in-time analysis was a longstanding battle, dating as far back as the early 80’s, that culminated with the introduction of temporal tables in the SQL standard in 2011. Up to that point, users were doomed to implement this as part of the application logic, often hurting the length of the development lifecycle as well as the maintainability of the code." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2019/05/14/flux-capacitor-huh-temporal-tables-and-joins-in-streaming-sql/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2019-05-14T12:00:00+00:00" />
<meta property="article:modified_time" content="2019-05-14T12:00:00+00:00" />
<title>Flux capacitor, huh? Temporal Tables and Joins in Streaming SQL | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2019/05/14/flux-capacitor-huh-temporal-tables-and-joins-in-streaming-sql/">Flux capacitor, huh? Temporal Tables and Joins in Streaming SQL</a>
</h1>
May 14, 2019 -
Marta Paes
<a href="https://twitter.com/morsapaes">(@morsapaes)</a>
<p><p>Figuring out how to manage and model temporal data for effective point-in-time analysis was a longstanding battle, dating as far back as the early 80’s, that culminated with the introduction of temporal tables in the SQL standard in 2011. Up to that point, users were doomed to implement this as part of the application logic, often hurting the length of the development lifecycle as well as the maintainability of the code. And, although there isn’t a single, commonly accepted definition of <strong>temporal data</strong>, the challenge it represents is one and the same: how do we validate or enrich data against dynamically changing, historical datasets?</p>
<center>
<img src="/img/blog/2019-05-13-temporal-tables/TemporalTables1.png" width="500px" alt="Taxi Fares and Conversion Rates"/>
</center>
<br>
<p><strong>For example:</strong> given a stream with Taxi Fare events tied to the local currency of the ride location, we might want to convert the fare price to a common currency for further processing. As conversion rates excel at fluctuating over time, each Taxi Fare event would need to be matched to the rate that was valid at the time the event occurred in order to produce a reliable result.</p>
<h2 id="modelling-temporal-data-with-flink">
Modelling Temporal Data with Flink
<a class="anchor" href="#modelling-temporal-data-with-flink">#</a>
</h2>
<p>In the 1.7 release, Flink has introduced the concept of <strong>temporal tables</strong> into its streaming SQL and Table API: parameterized views on append-only tables — or, any table that only allows records to be inserted, never updated or deleted — that are interpreted as a changelog and keep data closely tied to time context, so that it can be interpreted as valid only within a specific period of time. Transforming a stream into a temporal table requires:</p>
<ul>
<li>
<p>Defining a <strong>primary key</strong> and a <strong>versioning field</strong> that can be used to keep track of the changes that happen over time;</p>
</li>
<li>
<p>Exposing the stream as a <strong>temporal table function</strong> that maps each point in time to a static relation.</p>
</li>
</ul>
<p>Going back to our example use case, a temporal table is just what we need to model the conversion rate data such as to make it useful for point-in-time querying. Temporal table functions are implemented as an extension of Flink’s generic <a href="//nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/udfs.html#table-functions">table function</a> class and can be defined in the same straightforward way to be used with the Table API or SQL parser.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.functions.TemporalTableFunction</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">(...)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// Get the stream and table environments.</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">StreamExecutionEnvironment</span><span class="w"> </span><span class="n">env</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">StreamExecutionEnvironment</span><span class="p">.</span><span class="na">getExecutionEnvironment</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">StreamTableEnvironment</span><span class="w"> </span><span class="n">tEnv</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">StreamTableEnvironment</span><span class="p">.</span><span class="na">getTableEnvironment</span><span class="p">(</span><span class="n">env</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// Provide a sample static data set of the rates history table.</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">List</span><span class="w"> </span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">Long</span><span class="o">&gt;&gt;</span><span class="n">ratesHistoryData</span><span class="w"> </span><span class="o">=</span><span class="k">new</span><span class="w"> </span><span class="n">ArrayList</span><span class="o">&lt;&gt;</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">ratesHistoryData</span><span class="p">.</span><span class="na">add</span><span class="p">(</span><span class="n">Tuple2</span><span class="p">.</span><span class="na">of</span><span class="p">(</span><span class="s">&#34;USD&#34;</span><span class="p">,</span><span class="w"> </span><span class="n">102L</span><span class="p">));</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">ratesHistoryData</span><span class="p">.</span><span class="na">add</span><span class="p">(</span><span class="n">Tuple2</span><span class="p">.</span><span class="na">of</span><span class="p">(</span><span class="s">&#34;EUR&#34;</span><span class="p">,</span><span class="w"> </span><span class="n">114L</span><span class="p">));</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">ratesHistoryData</span><span class="p">.</span><span class="na">add</span><span class="p">(</span><span class="n">Tuple2</span><span class="p">.</span><span class="na">of</span><span class="p">(</span><span class="s">&#34;YEN&#34;</span><span class="p">,</span><span class="w"> </span><span class="n">1L</span><span class="p">));</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">ratesHistoryData</span><span class="p">.</span><span class="na">add</span><span class="p">(</span><span class="n">Tuple2</span><span class="p">.</span><span class="na">of</span><span class="p">(</span><span class="s">&#34;EUR&#34;</span><span class="p">,</span><span class="w"> </span><span class="n">116L</span><span class="p">));</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">ratesHistoryData</span><span class="p">.</span><span class="na">add</span><span class="p">(</span><span class="n">Tuple2</span><span class="p">.</span><span class="na">of</span><span class="p">(</span><span class="s">&#34;USD&#34;</span><span class="p">,</span><span class="w"> </span><span class="n">105L</span><span class="p">));</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// Create and register an example table using the sample data set.</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">Long</span><span class="o">&gt;&gt;</span><span class="w"> </span><span class="n">ratesHistoryStream</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">env</span><span class="p">.</span><span class="na">fromCollection</span><span class="p">(</span><span class="n">ratesHistoryData</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">Table</span><span class="w"> </span><span class="n">ratesHistory</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">tEnv</span><span class="p">.</span><span class="na">fromDataStream</span><span class="p">(</span><span class="n">ratesHistoryStream</span><span class="p">,</span><span class="w"> </span><span class="s">&#34;r_currency, r_rate, r_proctime.proctime&#34;</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">tEnv</span><span class="p">.</span><span class="na">registerTable</span><span class="p">(</span><span class="s">&#34;RatesHistory&#34;</span><span class="p">,</span><span class="w"> </span><span class="n">ratesHistory</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// Create and register the temporal table function &#34;rates&#34;.</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// Define &#34;r_proctime&#34; as the versioning field and &#34;r_currency&#34; as the primary key.</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">TemporalTableFunction</span><span class="w"> </span><span class="n">rates</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">ratesHistory</span><span class="p">.</span><span class="na">createTemporalTableFunction</span><span class="p">(</span><span class="s">&#34;r_proctime&#34;</span><span class="p">,</span><span class="w"> </span><span class="s">&#34;r_currency&#34;</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">tEnv</span><span class="p">.</span><span class="na">registerFunction</span><span class="p">(</span><span class="s">&#34;Rates&#34;</span><span class="p">,</span><span class="w"> </span><span class="n">rates</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">(...)</span><span class="w">
</span></span></span></code></pre></div><p>What does this <strong>Rates</strong> function do, in practice? Imagine we would like to check what the conversion rates looked like at a given time — say, 11:00. We could simply do something like:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">SELECT</span><span class="w"> </span><span class="o">*</span><span class="w"> </span><span class="k">FROM</span><span class="w"> </span><span class="n">Rates</span><span class="p">(</span><span class="s1">&#39;11:00&#39;</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><center>
<img src="/img/blog/2019-05-13-temporal-tables/TemporalTables2.png" width="650px" alt="Point-in-time Querying"/>
</center>
<br>
<p>Even though Flink does not yet support querying temporal table functions with a constant time attribute parameter, these functions can be used to cover a much more interesting scenario: temporal table joins.</p>
<h2 id="streaming-joins-using-temporal-tables">
Streaming Joins using Temporal Tables
<a class="anchor" href="#streaming-joins-using-temporal-tables">#</a>
</h2>
<p>Temporal tables reach their full potential when used in combination — erm, joined — with streaming data, for instance to power applications that must continuously whitelist against a reference dataset that changes over time for auditing or regulatory compliance. While efficient joins have long been an enduring challenge for query processors due to computational cost and resource consumption, joins over streaming data carry some additional challenges:</p>
<ul>
<li>The <strong>unbounded</strong> nature of streams means that inputs are continuously evaluated and intermediate join results can consume memory resources indefinitely. Flink gracefully manages its memory consumption out-of-the-box (even for heavier cases where joins require spilling to disk) and supports time-windowed joins to bound the amount of data that needs to be kept around as state;</li>
<li>Streaming data might be <strong>out-of-order</strong> and <strong>late</strong>, so it is not possible to enforce an ordering upfront and time handling requires some thinking to avoid unnecessary outputs and retractions.</li>
</ul>
<p>In the particular case of temporal data, time-windowed joins are not enough (well, at least not without getting into some expensive tweaking): sooner or later, each reference record will fall outside of the window and be wiped from state, no longer being considered for future join results. To address this limitation, Flink has introduced support for temporal table joins to cover time-varying relations.</p>
<center>
<img src="/img/blog/2019-05-13-temporal-tables/TemporalTables3.png" width="500px" alt="Temporal Table Join between Taxi Fares and Conversion Rates"/>
</center>
<br>
<p>Each record from the append-only table on the probe side (<code>Taxi Fare</code>) is joined with the version of the record from the temporal table on the build side (<code>Conversion Rate</code>) that most closely matches the probe side record time attribute (<code>time</code>) for the same value of the primary key (<code>currency</code>). Remember the temporal table function (<code>Rates</code>) we registered earlier? It can now be used to express this join as a simple SQL statement that would otherwise require a heavier statement with a subquery.</p>
<center>
<img src="/img/blog/2019-05-13-temporal-tables/TemporalTables4.png" width="700px" alt="Regular Join vs. Temporal Table Join"/>
</center>
<br>
<p>Temporal table joins support both <a href="//nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html#processing-time-temporal-joins">processing</a> and <a href="//nightlies.apache.org/flink/flink-docs-release-1.8/dev/table/streaming/joins.html#event-time-temporal-joins">event time</a> semantics and effectively limit the amount of data kept in state while also allowing records on the build side to be arbitrarily old, as opposed to time-windowed joins. Probe-side records only need to be kept in state for a very short time to ensure correct semantics in presence of out-of-order records. The challenges mentioned in the beginning of this section are overcome by:</p>
<ul>
<li>Narrowing the <strong>scope</strong> of the join: only the time-matching version of <code>ratesHistory</code> is visible for a given <code>taxiFare.time</code>;</li>
<li>Pruning <strong>unneeded records</strong> from state: for cases using event time, records between current time and the <a href="//nightlies.apache.org/flink/flink-docs-release-1.8/dev/event_time.html#event-time-and-watermarks">watermark</a> delay are persisted for both the probe and build side. These are discarded as soon as the watermark arrives and the results are emitted — allowing the join operation to move forward in time and the build table to “refresh” its version in state.</li>
</ul>
<h2 id="conclusion">
Conclusion
<a class="anchor" href="#conclusion">#</a>
</h2>
<p>All this means it is now possible to express continuous stream enrichment in relational and time-varying terms using Flink without dabbling into syntactic patchwork or compromising performance. In other words: stream time-travelling minus the flux capacitor. Extending this syntax to batch processing for enriching historic data with proper (event) time semantics is also part of the Flink roadmap!</p>
<p>If you&rsquo;d like to get some <strong>hands-on practice in joining streams with Flink SQL</strong> (and Flink SQL in general), checkout this <a href="https://github.com/ververica/sql-training/wiki">free training for Flink SQL</a>. The training environment is based on Docker and set up in just a few minutes.</p>
<p>Subscribe to the <a href="/community.html#mailing-lists">Apache Flink mailing lists</a> to stay up-to-date with the latest developments in this space.</p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2019-05-14-temporal-tables.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li>
<ul>
<li><a href="#modelling-temporal-data-with-flink">Modelling Temporal Data with Flink</a></li>
<li><a href="#streaming-joins-using-temporal-tables">Streaming Joins using Temporal Tables</a></li>
<li><a href="#conclusion">Conclusion</a></li>
</ul>
</li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="/what-is-flink/security">Security</a-->
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>