blob: 72273b11c97e1947e936969439e1fb38ecd03de8 [file] [log] [blame]
<!DOCTYPE HTML>
<html lang="en-US">
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<meta charset="UTF-8">
<title>Streaming</title>
<meta name="viewport" content="width=device-width,initial-scale=1">
<meta name="generator" content="Jekyll v4.2.2">
<link rel="stylesheet" href="//fonts.googleapis.com/css?family=Lato:300,300italic,400,400italic,700,700italic,900">
<link rel="stylesheet" href="/css/screen.css">
<link rel="icon" type="image/x-icon" href="/favicon.ico">
</head>
<body class="wrap">
<header role="banner">
<div class="grid">
<div class="unit center-on-mobiles">
<h1>
<a href="/">
<span class="sr-only">Apache Calcite</span>
<img src="/img/logo.svg" alt="Calcite Logo">
</a>
</h1>
</div>
<nav class="main-nav">
<ul>
<li class="">
<a href="/">Home</a>
</li>
<li class="">
<a href="/downloads/">Download</a>
</li>
<li class="">
<a href="/community/">Community</a>
</li>
<li class="">
<a href="/develop/">Develop</a>
</li>
<li class="">
<a href="/news/">News</a>
</li>
<li class="current">
<a href="/docs/">Docs</a>
</li>
</ul>
</nav>
</div>
</header>
<section class="docs">
<div class="grid">
<div class="docs-nav-mobile unit whole show-on-mobiles">
<select onchange="if (this.value) window.location.href=this.value">
<option value="">Navigate the docs…</option>
<optgroup label="Overview">
</optgroup>
<optgroup label="Advanced">
</optgroup>
<optgroup label="Avatica">
</optgroup>
<optgroup label="Reference">
</optgroup>
<optgroup label="Meta">
</optgroup>
</select>
</div>
<div class="unit four-fifths">
<article>
<h1>Streaming</h1>
<!--
-->
<p>Calcite has extended SQL and relational algebra in order to support
streaming queries.</p>
<ul id="markdown-toc">
<li><a href="#introduction" id="markdown-toc-introduction">Introduction</a></li>
<li><a href="#an-example-schema" id="markdown-toc-an-example-schema">An example schema</a></li>
<li><a href="#a-simple-query" id="markdown-toc-a-simple-query">A simple query</a></li>
<li><a href="#filtering-rows" id="markdown-toc-filtering-rows">Filtering rows</a></li>
<li><a href="#projecting-expressions" id="markdown-toc-projecting-expressions">Projecting expressions</a></li>
<li><a href="#tumbling-windows" id="markdown-toc-tumbling-windows">Tumbling windows</a></li>
<li><a href="#tumbling-windows-improved" id="markdown-toc-tumbling-windows-improved">Tumbling windows, improved</a></li>
<li><a href="#hopping-windows" id="markdown-toc-hopping-windows">Hopping windows</a></li>
<li><a href="#grouping-sets" id="markdown-toc-grouping-sets">GROUPING SETS</a></li>
<li><a href="#filtering-after-aggregation" id="markdown-toc-filtering-after-aggregation">Filtering after aggregation</a></li>
<li><a href="#sub-queries-views-and-sqls-closure-property" id="markdown-toc-sub-queries-views-and-sqls-closure-property">Sub-queries, views and SQL’s closure property</a></li>
<li><a href="#converting-between-streams-and-relations" id="markdown-toc-converting-between-streams-and-relations">Converting between streams and relations</a></li>
<li><a href="#the-pie-chart-problem-relational-queries-on-streams" id="markdown-toc-the-pie-chart-problem-relational-queries-on-streams">The “pie chart” problem: Relational queries on streams</a></li>
<li><a href="#sorting" id="markdown-toc-sorting">Sorting</a></li>
<li><a href="#table-constructor" id="markdown-toc-table-constructor">Table constructor</a></li>
<li><a href="#sliding-windows" id="markdown-toc-sliding-windows">Sliding windows</a></li>
<li><a href="#cascading-windows" id="markdown-toc-cascading-windows">Cascading windows</a></li>
<li><a href="#joining-streams-to-tables" id="markdown-toc-joining-streams-to-tables">Joining streams to tables</a></li>
<li><a href="#joining-streams-to-streams" id="markdown-toc-joining-streams-to-streams">Joining streams to streams</a></li>
<li><a href="#dml" id="markdown-toc-dml">DML</a></li>
<li><a href="#punctuation" id="markdown-toc-punctuation">Punctuation</a></li>
<li>
<a href="#state-of-the-stream" id="markdown-toc-state-of-the-stream">State of the stream</a> <ul>
<li><a href="#implemented" id="markdown-toc-implemented">Implemented</a></li>
<li><a href="#not-implemented" id="markdown-toc-not-implemented">Not implemented</a></li>
<li><a href="#to-do-in-this-document" id="markdown-toc-to-do-in-this-document">To do in this document</a></li>
</ul>
</li>
<li><a href="#functions" id="markdown-toc-functions">Functions</a></li>
<li><a href="#references" id="markdown-toc-references">References</a></li>
</ul>
<h2 id="introduction">Introduction</h2>
<p>Streams are collections to records that flow continuously, and forever.
Unlike tables, they are not typically stored on disk, but flow over the
network and are held for short periods of time in memory.</p>
<p>Streams complement tables because they represent what is happening in the
present and future of the enterprise whereas tables represent the past.
It is very common for a stream to be archived into a table.</p>
<p>Like tables, you often want to query streams in a high-level language
based on relational algebra, validated according to a schema, and optimized
to take advantage of available resources and algorithms.</p>
<p>Calcite’s SQL is an extension to standard SQL, not another ‘SQL-like’ language.
The distinction is important, for several reasons:</p>
<ul>
<li>Streaming SQL is easy to learn for anyone who knows regular SQL.</li>
<li>The semantics are clear, because we aim to produce the same results on a
stream as if the same data were in a table.</li>
<li>You can write queries that combine streams and tables (or the history of
a stream, which is basically an in-memory table).</li>
<li>Lots of existing tools can generate standard SQL.</li>
</ul>
<p>If you don’t use the <code class="language-plaintext highlighter-rouge">STREAM</code> keyword, you are back in regular
standard SQL.</p>
<h2 id="an-example-schema">An example schema</h2>
<p>Our streaming SQL examples use the following schema:</p>
<ul>
<li>
<code class="language-plaintext highlighter-rouge">Orders (rowtime, productId, orderId, units)</code> - a stream and a table</li>
<li>
<code class="language-plaintext highlighter-rouge">Products (rowtime, productId, name)</code> - a table</li>
<li>
<code class="language-plaintext highlighter-rouge">Shipments (rowtime, orderId)</code> - a stream</li>
</ul>
<h2 id="a-simple-query">A simple query</h2>
<p>Let’s start with the simplest streaming query:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">STREAM</span> <span class="o">*</span>
<span class="k">FROM</span> <span class="n">Orders</span><span class="p">;</span>
<span class="n">rowtime</span> <span class="o">|</span> <span class="n">productId</span> <span class="o">|</span> <span class="n">orderId</span> <span class="o">|</span> <span class="n">units</span>
<span class="c1">----------+-----------+---------+-------</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">17</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">30</span> <span class="o">|</span> <span class="mi">5</span> <span class="o">|</span> <span class="mi">4</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">17</span><span class="p">:</span><span class="mi">05</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">6</span> <span class="o">|</span> <span class="mi">1</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">18</span><span class="p">:</span><span class="mi">05</span> <span class="o">|</span> <span class="mi">20</span> <span class="o">|</span> <span class="mi">7</span> <span class="o">|</span> <span class="mi">2</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">18</span><span class="p">:</span><span class="mi">07</span> <span class="o">|</span> <span class="mi">30</span> <span class="o">|</span> <span class="mi">8</span> <span class="o">|</span> <span class="mi">20</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">02</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">9</span> <span class="o">|</span> <span class="mi">6</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">04</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">1</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">09</span><span class="p">:</span><span class="mi">30</span> <span class="o">|</span> <span class="mi">40</span> <span class="o">|</span> <span class="mi">11</span> <span class="o">|</span> <span class="mi">12</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">24</span><span class="p">:</span><span class="mi">11</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">12</span> <span class="o">|</span> <span class="mi">4</span></code></pre></figure>
<p>This query reads all columns and rows from the <code class="language-plaintext highlighter-rouge">Orders</code> stream.
Like any streaming query, it never terminates. It outputs a record whenever
a record arrives in <code class="language-plaintext highlighter-rouge">Orders</code>.</p>
<p>Type <code class="language-plaintext highlighter-rouge">Control-C</code> to terminate the query.</p>
<p>The <code class="language-plaintext highlighter-rouge">STREAM</code> keyword is the main extension in streaming SQL. It tells the
system that you are interested in incoming orders, not existing ones. The query</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="o">*</span>
<span class="k">FROM</span> <span class="n">Orders</span><span class="p">;</span>
<span class="n">rowtime</span> <span class="o">|</span> <span class="n">productId</span> <span class="o">|</span> <span class="n">orderId</span> <span class="o">|</span> <span class="n">units</span>
<span class="c1">----------+-----------+---------+-------</span>
<span class="mi">08</span><span class="p">:</span><span class="mi">30</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">1</span> <span class="o">|</span> <span class="mi">3</span>
<span class="mi">08</span><span class="p">:</span><span class="mi">45</span><span class="p">:</span><span class="mi">10</span> <span class="o">|</span> <span class="mi">20</span> <span class="o">|</span> <span class="mi">2</span> <span class="o">|</span> <span class="mi">1</span>
<span class="mi">09</span><span class="p">:</span><span class="mi">12</span><span class="p">:</span><span class="mi">21</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">3</span> <span class="o">|</span> <span class="mi">10</span>
<span class="mi">09</span><span class="p">:</span><span class="mi">27</span><span class="p">:</span><span class="mi">44</span> <span class="o">|</span> <span class="mi">30</span> <span class="o">|</span> <span class="mi">4</span> <span class="o">|</span> <span class="mi">2</span>
<span class="mi">4</span> <span class="n">records</span> <span class="n">returned</span><span class="p">.</span></code></pre></figure>
<p>is also valid, but will print out all existing orders and then terminate. We
call it a <em>relational</em> query, as opposed to <em>streaming</em>. It has traditional
SQL semantics.</p>
<p><code class="language-plaintext highlighter-rouge">Orders</code> is special, in that it has both a stream and a table. If you try to run
a streaming query on a table, or a relational query on a stream, Calcite gives
an error:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="o">*</span> <span class="k">FROM</span> <span class="n">Shipments</span><span class="p">;</span>
<span class="n">ERROR</span><span class="p">:</span> <span class="n">Cannot</span> <span class="k">convert</span> <span class="n">stream</span> <span class="s1">'SHIPMENTS'</span> <span class="k">to</span> <span class="n">a</span> <span class="k">table</span>
<span class="k">SELECT</span> <span class="n">STREAM</span> <span class="o">*</span> <span class="k">FROM</span> <span class="n">Products</span><span class="p">;</span>
<span class="n">ERROR</span><span class="p">:</span> <span class="n">Cannot</span> <span class="k">convert</span> <span class="k">table</span> <span class="s1">'PRODUCTS'</span> <span class="k">to</span> <span class="n">a</span> <span class="n">stream</span></code></pre></figure>
<h1 id="filtering-rows">Filtering rows</h1>
<p>Just as in regular SQL, you use a <code class="language-plaintext highlighter-rouge">WHERE</code> clause to filter rows:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">STREAM</span> <span class="o">*</span>
<span class="k">FROM</span> <span class="n">Orders</span>
<span class="k">WHERE</span> <span class="n">units</span> <span class="o">&gt;</span> <span class="mi">3</span><span class="p">;</span>
<span class="n">rowtime</span> <span class="o">|</span> <span class="n">productId</span> <span class="o">|</span> <span class="n">orderId</span> <span class="o">|</span> <span class="n">units</span>
<span class="c1">----------+-----------+---------+-------</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">17</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">30</span> <span class="o">|</span> <span class="mi">5</span> <span class="o">|</span> <span class="mi">4</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">18</span><span class="p">:</span><span class="mi">07</span> <span class="o">|</span> <span class="mi">30</span> <span class="o">|</span> <span class="mi">8</span> <span class="o">|</span> <span class="mi">20</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">02</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">9</span> <span class="o">|</span> <span class="mi">6</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">09</span><span class="p">:</span><span class="mi">30</span> <span class="o">|</span> <span class="mi">40</span> <span class="o">|</span> <span class="mi">11</span> <span class="o">|</span> <span class="mi">12</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">24</span><span class="p">:</span><span class="mi">11</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">12</span> <span class="o">|</span> <span class="mi">4</span></code></pre></figure>
<h1 id="projecting-expressions">Projecting expressions</h1>
<p>Use expressions in the <code class="language-plaintext highlighter-rouge">SELECT</code> clause to choose which columns to return or
compute expressions:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">STREAM</span> <span class="n">rowtime</span><span class="p">,</span>
<span class="s1">'An order for '</span> <span class="o">||</span> <span class="n">units</span> <span class="o">||</span> <span class="s1">' '</span>
<span class="o">||</span> <span class="k">CASE</span> <span class="n">units</span> <span class="k">WHEN</span> <span class="mi">1</span> <span class="k">THEN</span> <span class="s1">'unit'</span> <span class="k">ELSE</span> <span class="s1">'units'</span> <span class="k">END</span>
<span class="o">||</span> <span class="s1">' of product #'</span> <span class="o">||</span> <span class="n">productId</span> <span class="k">AS</span> <span class="n">description</span>
<span class="k">FROM</span> <span class="n">Orders</span><span class="p">;</span>
<span class="n">rowtime</span> <span class="o">|</span> <span class="n">description</span>
<span class="c1">----------+---------------------------------------</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">17</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="n">An</span> <span class="k">order</span> <span class="k">for</span> <span class="mi">4</span> <span class="n">units</span> <span class="k">of</span> <span class="n">product</span> <span class="o">#</span><span class="mi">30</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">17</span><span class="p">:</span><span class="mi">05</span> <span class="o">|</span> <span class="n">An</span> <span class="k">order</span> <span class="k">for</span> <span class="mi">1</span> <span class="n">unit</span> <span class="k">of</span> <span class="n">product</span> <span class="o">#</span><span class="mi">10</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">18</span><span class="p">:</span><span class="mi">05</span> <span class="o">|</span> <span class="n">An</span> <span class="k">order</span> <span class="k">for</span> <span class="mi">2</span> <span class="n">units</span> <span class="k">of</span> <span class="n">product</span> <span class="o">#</span><span class="mi">20</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">18</span><span class="p">:</span><span class="mi">07</span> <span class="o">|</span> <span class="n">An</span> <span class="k">order</span> <span class="k">for</span> <span class="mi">20</span> <span class="n">units</span> <span class="k">of</span> <span class="n">product</span> <span class="o">#</span><span class="mi">30</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">02</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="n">An</span> <span class="k">order</span> <span class="k">by</span> <span class="mi">6</span> <span class="n">units</span> <span class="k">of</span> <span class="n">product</span> <span class="o">#</span><span class="mi">10</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">04</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="n">An</span> <span class="k">order</span> <span class="k">by</span> <span class="mi">1</span> <span class="n">unit</span> <span class="k">of</span> <span class="n">product</span> <span class="o">#</span><span class="mi">10</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">09</span><span class="p">:</span><span class="mi">30</span> <span class="o">|</span> <span class="n">An</span> <span class="k">order</span> <span class="k">for</span> <span class="mi">12</span> <span class="n">units</span> <span class="k">of</span> <span class="n">product</span> <span class="o">#</span><span class="mi">40</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">24</span><span class="p">:</span><span class="mi">11</span> <span class="o">|</span> <span class="n">An</span> <span class="k">order</span> <span class="k">by</span> <span class="mi">4</span> <span class="n">units</span> <span class="k">of</span> <span class="n">product</span> <span class="o">#</span><span class="mi">10</span></code></pre></figure>
<p>We recommend that you always include the <code class="language-plaintext highlighter-rouge">rowtime</code> column in the <code class="language-plaintext highlighter-rouge">SELECT</code>
clause. Having a sorted timestamp in each stream and streaming query makes it
possible to do advanced calculations later, such as <code class="language-plaintext highlighter-rouge">GROUP BY</code> and <code class="language-plaintext highlighter-rouge">JOIN</code>.</p>
<h1 id="tumbling-windows">Tumbling windows</h1>
<p>There are several ways to compute aggregate functions on streams. The
differences are:</p>
<ul>
<li>How many rows come out for each row in?</li>
<li>Does each incoming value appear in one total, or more?</li>
<li>What defines the “window”, the set of rows that contribute to a given output row?</li>
<li>Is the result a stream or a relation?</li>
</ul>
<p>There are various window types:</p>
<ul>
<li>tumbling window (GROUP BY)</li>
<li>hopping window (multi GROUP BY)</li>
<li>sliding window (window functions)</li>
<li>cascading window (window functions)</li>
</ul>
<p>and the following diagram shows the kinds of query in which to use them:</p>
<p><img src="/img/window-types.png" alt="Window types"></p>
<p>First we’ll look a <em>tumbling window</em>, which is defined by a streaming
<code class="language-plaintext highlighter-rouge">GROUP BY</code>. Here is an example:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">STREAM</span> <span class="n">CEIL</span><span class="p">(</span><span class="n">rowtime</span> <span class="k">TO</span> <span class="n">HOUR</span><span class="p">)</span> <span class="k">AS</span> <span class="n">rowtime</span><span class="p">,</span>
<span class="n">productId</span><span class="p">,</span>
<span class="k">COUNT</span><span class="p">(</span><span class="o">*</span><span class="p">)</span> <span class="k">AS</span> <span class="k">c</span><span class="p">,</span>
<span class="k">SUM</span><span class="p">(</span><span class="n">units</span><span class="p">)</span> <span class="k">AS</span> <span class="n">units</span>
<span class="k">FROM</span> <span class="n">Orders</span>
<span class="k">GROUP</span> <span class="k">BY</span> <span class="n">CEIL</span><span class="p">(</span><span class="n">rowtime</span> <span class="k">TO</span> <span class="n">HOUR</span><span class="p">),</span> <span class="n">productId</span><span class="p">;</span>
<span class="n">rowtime</span> <span class="o">|</span> <span class="n">productId</span> <span class="o">|</span> <span class="k">c</span> <span class="o">|</span> <span class="n">units</span>
<span class="c1">----------+-----------+---------+-------</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">30</span> <span class="o">|</span> <span class="mi">2</span> <span class="o">|</span> <span class="mi">24</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">1</span> <span class="o">|</span> <span class="mi">1</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">20</span> <span class="o">|</span> <span class="mi">1</span> <span class="o">|</span> <span class="mi">7</span>
<span class="mi">12</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">3</span> <span class="o">|</span> <span class="mi">11</span>
<span class="mi">12</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">40</span> <span class="o">|</span> <span class="mi">1</span> <span class="o">|</span> <span class="mi">12</span></code></pre></figure>
<p>The result is a stream. At 11 o’clock, Calcite emits a sub-total for every
<code class="language-plaintext highlighter-rouge">productId</code> that had an order since 10 o’clock, timestamped 11 o’clock.
At 12 o’clock, it will emit
the orders that occurred between 11:00 and 12:00. Each input row contributes to
only one output row.</p>
<p>How did Calcite know that the 10:00:00 sub-totals were complete at 11:00:00,
so that it could emit them? It knows that <code class="language-plaintext highlighter-rouge">rowtime</code> is increasing, and it knows
that <code class="language-plaintext highlighter-rouge">CEIL(rowtime TO HOUR)</code> is also increasing. So, once it has seen a row
at or after 11:00:00, it will never see a row that will contribute to a 10:00:00
total.</p>
<p>A column or expression that is increasing or decreasing is said to be
<em>monotonic</em>.</p>
<p>If column or expression has values that are slightly out of order,
and the stream has a mechanism (such as punctuation or watermarks)
to declare that a particular value will never be seen again, then
the column or expression is said to be <em>quasi-monotonic</em>.</p>
<p>Without a monotonic or quasi-monotonic expression in the <code class="language-plaintext highlighter-rouge">GROUP BY</code> clause,
Calcite is
not able to make progress, and it will not allow the query:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">STREAM</span> <span class="n">productId</span><span class="p">,</span>
<span class="k">COUNT</span><span class="p">(</span><span class="o">*</span><span class="p">)</span> <span class="k">AS</span> <span class="k">c</span><span class="p">,</span>
<span class="k">SUM</span><span class="p">(</span><span class="n">units</span><span class="p">)</span> <span class="k">AS</span> <span class="n">units</span>
<span class="k">FROM</span> <span class="n">Orders</span>
<span class="k">GROUP</span> <span class="k">BY</span> <span class="n">productId</span><span class="p">;</span>
<span class="n">ERROR</span><span class="p">:</span> <span class="n">Streaming</span> <span class="n">aggregation</span> <span class="n">requires</span> <span class="k">at</span> <span class="n">least</span> <span class="n">one</span> <span class="n">monotonic</span> <span class="n">expression</span> <span class="k">in</span> <span class="k">GROUP</span> <span class="k">BY</span> <span class="n">clause</span></code></pre></figure>
<p>Monotonic and quasi-monotonic columns need to be declared in the schema.
The monotonicity is
enforced when records enter the stream and assumed by queries that read from
that stream. We recommend that you give each stream a timestamp column called
<code class="language-plaintext highlighter-rouge">rowtime</code>, but you can declare others to be monotonic, <code class="language-plaintext highlighter-rouge">orderId</code>, for example.</p>
<p>We discuss punctuation, watermarks, and other ways of making progress
<a href="#punctuation">below</a>.</p>
<h1 id="tumbling-windows-improved">Tumbling windows, improved</h1>
<p>The previous example of tumbling windows was easy to write because the window
was one hour. For intervals that are not a whole time unit, say 2 hours or
2 hours and 17 minutes, you cannot use <code class="language-plaintext highlighter-rouge">CEIL</code>, and the expression gets more
complicated.</p>
<p>Calcite supports an alternative syntax for tumbling windows:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">STREAM</span> <span class="n">TUMBLE_END</span><span class="p">(</span><span class="n">rowtime</span><span class="p">,</span> <span class="n">INTERVAL</span> <span class="s1">'1'</span> <span class="n">HOUR</span><span class="p">)</span> <span class="k">AS</span> <span class="n">rowtime</span><span class="p">,</span>
<span class="n">productId</span><span class="p">,</span>
<span class="k">COUNT</span><span class="p">(</span><span class="o">*</span><span class="p">)</span> <span class="k">AS</span> <span class="k">c</span><span class="p">,</span>
<span class="k">SUM</span><span class="p">(</span><span class="n">units</span><span class="p">)</span> <span class="k">AS</span> <span class="n">units</span>
<span class="k">FROM</span> <span class="n">Orders</span>
<span class="k">GROUP</span> <span class="k">BY</span> <span class="n">TUMBLE</span><span class="p">(</span><span class="n">rowtime</span><span class="p">,</span> <span class="n">INTERVAL</span> <span class="s1">'1'</span> <span class="n">HOUR</span><span class="p">),</span> <span class="n">productId</span><span class="p">;</span>
<span class="n">rowtime</span> <span class="o">|</span> <span class="n">productId</span> <span class="o">|</span> <span class="k">c</span> <span class="o">|</span> <span class="n">units</span>
<span class="c1">----------+-----------+---------+-------</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">30</span> <span class="o">|</span> <span class="mi">2</span> <span class="o">|</span> <span class="mi">24</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">1</span> <span class="o">|</span> <span class="mi">1</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">20</span> <span class="o">|</span> <span class="mi">1</span> <span class="o">|</span> <span class="mi">7</span>
<span class="mi">12</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">3</span> <span class="o">|</span> <span class="mi">11</span>
<span class="mi">12</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">40</span> <span class="o">|</span> <span class="mi">1</span> <span class="o">|</span> <span class="mi">12</span></code></pre></figure>
<p>As you can see, it returns the same results as the previous query. The <code class="language-plaintext highlighter-rouge">TUMBLE</code>
function returns a grouping key that is the same for all the rows that will end
up in a given summary row; the <code class="language-plaintext highlighter-rouge">TUMBLE_END</code> function takes the same arguments
and returns the time at which that window ends;
there is also a <code class="language-plaintext highlighter-rouge">TUMBLE_START</code> function.</p>
<p><code class="language-plaintext highlighter-rouge">TUMBLE</code> has an optional parameter to align the window.
In the following example,
we use a 30 minute interval and 0:12 as the alignment time,
so the query emits summaries at 12 and 42 minutes past each hour:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">STREAM</span>
<span class="n">TUMBLE_END</span><span class="p">(</span><span class="n">rowtime</span><span class="p">,</span> <span class="n">INTERVAL</span> <span class="s1">'30'</span> <span class="k">MINUTE</span><span class="p">,</span> <span class="nb">TIME</span> <span class="s1">'0:12'</span><span class="p">)</span> <span class="k">AS</span> <span class="n">rowtime</span><span class="p">,</span>
<span class="n">productId</span><span class="p">,</span>
<span class="k">COUNT</span><span class="p">(</span><span class="o">*</span><span class="p">)</span> <span class="k">AS</span> <span class="k">c</span><span class="p">,</span>
<span class="k">SUM</span><span class="p">(</span><span class="n">units</span><span class="p">)</span> <span class="k">AS</span> <span class="n">units</span>
<span class="k">FROM</span> <span class="n">Orders</span>
<span class="k">GROUP</span> <span class="k">BY</span> <span class="n">TUMBLE</span><span class="p">(</span><span class="n">rowtime</span><span class="p">,</span> <span class="n">INTERVAL</span> <span class="s1">'30'</span> <span class="k">MINUTE</span><span class="p">,</span> <span class="nb">TIME</span> <span class="s1">'0:12'</span><span class="p">),</span>
<span class="n">productId</span><span class="p">;</span>
<span class="n">rowtime</span> <span class="o">|</span> <span class="n">productId</span> <span class="o">|</span> <span class="k">c</span> <span class="o">|</span> <span class="n">units</span>
<span class="c1">----------+-----------+---------+-------</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">42</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">30</span> <span class="o">|</span> <span class="mi">2</span> <span class="o">|</span> <span class="mi">24</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">42</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">1</span> <span class="o">|</span> <span class="mi">1</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">42</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">20</span> <span class="o">|</span> <span class="mi">1</span> <span class="o">|</span> <span class="mi">7</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">12</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">2</span> <span class="o">|</span> <span class="mi">7</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">12</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">40</span> <span class="o">|</span> <span class="mi">1</span> <span class="o">|</span> <span class="mi">12</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">42</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">1</span> <span class="o">|</span> <span class="mi">4</span></code></pre></figure>
<h1 id="hopping-windows">Hopping windows</h1>
<p>Hopping windows are a generalization of tumbling windows that allow data to
be kept in a window for a longer than the emit interval.</p>
<p>For example, the following query emits a row timestamped 11:00 containing data
from 08:00 to 11:00 (or 10:59.9 if we’re being pedantic),
and a row timestamped 12:00 containing data from 09:00
to 12:00.</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">STREAM</span>
<span class="n">HOP_END</span><span class="p">(</span><span class="n">rowtime</span><span class="p">,</span> <span class="n">INTERVAL</span> <span class="s1">'1'</span> <span class="n">HOUR</span><span class="p">,</span> <span class="n">INTERVAL</span> <span class="s1">'3'</span> <span class="n">HOUR</span><span class="p">)</span> <span class="k">AS</span> <span class="n">rowtime</span><span class="p">,</span>
<span class="k">COUNT</span><span class="p">(</span><span class="o">*</span><span class="p">)</span> <span class="k">AS</span> <span class="k">c</span><span class="p">,</span>
<span class="k">SUM</span><span class="p">(</span><span class="n">units</span><span class="p">)</span> <span class="k">AS</span> <span class="n">units</span>
<span class="k">FROM</span> <span class="n">Orders</span>
<span class="k">GROUP</span> <span class="k">BY</span> <span class="n">HOP</span><span class="p">(</span><span class="n">rowtime</span><span class="p">,</span> <span class="n">INTERVAL</span> <span class="s1">'1'</span> <span class="n">HOUR</span><span class="p">,</span> <span class="n">INTERVAL</span> <span class="s1">'3'</span> <span class="n">HOUR</span><span class="p">);</span>
<span class="n">rowtime</span> <span class="o">|</span> <span class="k">c</span> <span class="o">|</span> <span class="n">units</span>
<span class="c1">----------+----------+-------</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">4</span> <span class="o">|</span> <span class="mi">27</span>
<span class="mi">12</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">8</span> <span class="o">|</span> <span class="mi">50</span></code></pre></figure>
<p>In this query, because the retain period is 3 times the emit period, every input
row contributes to exactly 3 output rows. Imagine that the <code class="language-plaintext highlighter-rouge">HOP</code> function
generates a collection of group keys for incoming row, and places its values
in the accumulators of each of those group keys. For example,
<code class="language-plaintext highlighter-rouge">HOP(10:18:00, INTERVAL '1' HOUR, INTERVAL '3')</code> generates 3 periods</p>
<p><code class="language-plaintext highlighter-rouge">[08:00, 09:00)
[09:00, 10:00)
[10:00, 11:00)
</code></p>
<p>This raises the possibility of allowing user-defined partitioning functions
for users who are not happy with the built-in functions <code class="language-plaintext highlighter-rouge">HOP</code> and <code class="language-plaintext highlighter-rouge">TUMBLE</code>.</p>
<p>We can build complex complex expressions such as an exponentially decaying
moving average:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">STREAM</span> <span class="n">HOP_END</span><span class="p">(</span><span class="n">rowtime</span><span class="p">),</span>
<span class="n">productId</span><span class="p">,</span>
<span class="k">SUM</span><span class="p">(</span><span class="n">unitPrice</span> <span class="o">*</span> <span class="n">EXP</span><span class="p">((</span><span class="n">rowtime</span> <span class="o">-</span> <span class="n">HOP_START</span><span class="p">(</span><span class="n">rowtime</span><span class="p">))</span> <span class="k">SECOND</span> <span class="o">/</span> <span class="n">INTERVAL</span> <span class="s1">'1'</span> <span class="n">HOUR</span><span class="p">))</span>
<span class="o">/</span> <span class="k">SUM</span><span class="p">(</span><span class="n">EXP</span><span class="p">((</span><span class="n">rowtime</span> <span class="o">-</span> <span class="n">HOP_START</span><span class="p">(</span><span class="n">rowtime</span><span class="p">))</span> <span class="k">SECOND</span> <span class="o">/</span> <span class="n">INTERVAL</span> <span class="s1">'1'</span> <span class="n">HOUR</span><span class="p">))</span>
<span class="k">FROM</span> <span class="n">Orders</span>
<span class="k">GROUP</span> <span class="k">BY</span> <span class="n">HOP</span><span class="p">(</span><span class="n">rowtime</span><span class="p">,</span> <span class="n">INTERVAL</span> <span class="s1">'1'</span> <span class="k">SECOND</span><span class="p">,</span> <span class="n">INTERVAL</span> <span class="s1">'1'</span> <span class="n">HOUR</span><span class="p">),</span>
<span class="n">productId</span></code></pre></figure>
<p>Emits:</p>
<ul>
<li>a row at <code class="language-plaintext highlighter-rouge">11:00:00</code> containing rows in <code class="language-plaintext highlighter-rouge">[10:00:00, 11:00:00)</code>;</li>
<li>a row at <code class="language-plaintext highlighter-rouge">11:00:01</code> containing rows in <code class="language-plaintext highlighter-rouge">[10:00:01, 11:00:01)</code>.</li>
</ul>
<p>The expression weighs recent orders more heavily than older orders.
Extending the window from 1 hour to 2 hours or 1 year would have
virtually no effect on the accuracy of the result (but use more memory
and compute).</p>
<p>Note that we use <code class="language-plaintext highlighter-rouge">HOP_START</code> inside an aggregate function (<code class="language-plaintext highlighter-rouge">SUM</code>) because it
is a value that is constant for all rows within a sub-total. This
would not be allowed for typical aggregate functions (<code class="language-plaintext highlighter-rouge">SUM</code>, <code class="language-plaintext highlighter-rouge">COUNT</code>
etc.).</p>
<p>If you are familiar with <code class="language-plaintext highlighter-rouge">GROUPING SETS</code>, you may notice that partitioning
functions can be seen as a generalization of <code class="language-plaintext highlighter-rouge">GROUPING SETS</code>, in that they
allow an input row to contribute to multiple sub-totals.
The auxiliary functions for <code class="language-plaintext highlighter-rouge">GROUPING SETS</code>,
such as <code class="language-plaintext highlighter-rouge">GROUPING()</code> and <code class="language-plaintext highlighter-rouge">GROUP_ID</code>,
can be used inside aggregate functions, so it is not surprising that
<code class="language-plaintext highlighter-rouge">HOP_START</code> and <code class="language-plaintext highlighter-rouge">HOP_END</code> can be used in the same way.</p>
<h1 id="grouping-sets">GROUPING SETS</h1>
<p><code class="language-plaintext highlighter-rouge">GROUPING SETS</code> is valid for a streaming query provided that every
grouping set contains a monotonic or quasi-monotonic expression.</p>
<p><code class="language-plaintext highlighter-rouge">CUBE</code> and <code class="language-plaintext highlighter-rouge">ROLLUP</code> are not valid for streaming query, because they will
produce at least one grouping set that aggregates everything (like
<code class="language-plaintext highlighter-rouge">GROUP BY ()</code>).</p>
<h1 id="filtering-after-aggregation">Filtering after aggregation</h1>
<p>As in standard SQL, you can apply a <code class="language-plaintext highlighter-rouge">HAVING</code> clause to filter rows emitted by
a streaming <code class="language-plaintext highlighter-rouge">GROUP BY</code>:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">STREAM</span> <span class="n">TUMBLE_END</span><span class="p">(</span><span class="n">rowtime</span><span class="p">,</span> <span class="n">INTERVAL</span> <span class="s1">'1'</span> <span class="n">HOUR</span><span class="p">)</span> <span class="k">AS</span> <span class="n">rowtime</span><span class="p">,</span>
<span class="n">productId</span>
<span class="k">FROM</span> <span class="n">Orders</span>
<span class="k">GROUP</span> <span class="k">BY</span> <span class="n">TUMBLE</span><span class="p">(</span><span class="n">rowtime</span><span class="p">,</span> <span class="n">INTERVAL</span> <span class="s1">'1'</span> <span class="n">HOUR</span><span class="p">),</span> <span class="n">productId</span>
<span class="k">HAVING</span> <span class="k">COUNT</span><span class="p">(</span><span class="o">*</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">2</span> <span class="k">OR</span> <span class="k">SUM</span><span class="p">(</span><span class="n">units</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">10</span><span class="p">;</span>
<span class="n">rowtime</span> <span class="o">|</span> <span class="n">productId</span>
<span class="c1">----------+-----------</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">30</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">40</span></code></pre></figure>
<h1 id="sub-queries-views-and-sqls-closure-property">Sub-queries, views and SQL’s closure property</h1>
<p>The previous <code class="language-plaintext highlighter-rouge">HAVING</code> query can be expressed using a <code class="language-plaintext highlighter-rouge">WHERE</code> clause on a
sub-query:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">STREAM</span> <span class="n">rowtime</span><span class="p">,</span> <span class="n">productId</span>
<span class="k">FROM</span> <span class="p">(</span>
<span class="k">SELECT</span> <span class="n">TUMBLE_END</span><span class="p">(</span><span class="n">rowtime</span><span class="p">,</span> <span class="n">INTERVAL</span> <span class="s1">'1'</span> <span class="n">HOUR</span><span class="p">)</span> <span class="k">AS</span> <span class="n">rowtime</span><span class="p">,</span>
<span class="n">productId</span><span class="p">,</span>
<span class="k">COUNT</span><span class="p">(</span><span class="o">*</span><span class="p">)</span> <span class="k">AS</span> <span class="k">c</span><span class="p">,</span>
<span class="k">SUM</span><span class="p">(</span><span class="n">units</span><span class="p">)</span> <span class="k">AS</span> <span class="n">su</span>
<span class="k">FROM</span> <span class="n">Orders</span>
<span class="k">GROUP</span> <span class="k">BY</span> <span class="n">TUMBLE</span><span class="p">(</span><span class="n">rowtime</span><span class="p">,</span> <span class="n">INTERVAL</span> <span class="s1">'1'</span> <span class="n">HOUR</span><span class="p">),</span> <span class="n">productId</span><span class="p">)</span>
<span class="k">WHERE</span> <span class="k">c</span> <span class="o">&gt;</span> <span class="mi">2</span> <span class="k">OR</span> <span class="n">su</span> <span class="o">&gt;</span> <span class="mi">10</span><span class="p">;</span>
<span class="n">rowtime</span> <span class="o">|</span> <span class="n">productId</span>
<span class="c1">----------+-----------</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">30</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">40</span></code></pre></figure>
<p><code class="language-plaintext highlighter-rouge">HAVING</code> was introduced in the early days of SQL, when a way was needed to
perform a filter <em>after</em> aggregation. (Recall that <code class="language-plaintext highlighter-rouge">WHERE</code> filters rows before
they enter the <code class="language-plaintext highlighter-rouge">GROUP BY</code> clause.)</p>
<p>Since then, SQL has become a mathematically closed language, which means that
any operation you can perform on a table can also perform on a query.</p>
<p>The <em>closure property</em> of SQL is extremely powerful. Not only does it render
<code class="language-plaintext highlighter-rouge">HAVING</code> obsolete (or, at least, reduce it to syntactic sugar), it makes views
possible:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">CREATE</span> <span class="k">VIEW</span> <span class="n">HourlyOrderTotals</span> <span class="p">(</span><span class="n">rowtime</span><span class="p">,</span> <span class="n">productId</span><span class="p">,</span> <span class="k">c</span><span class="p">,</span> <span class="n">su</span><span class="p">)</span> <span class="k">AS</span>
<span class="k">SELECT</span> <span class="n">TUMBLE_END</span><span class="p">(</span><span class="n">rowtime</span><span class="p">,</span> <span class="n">INTERVAL</span> <span class="s1">'1'</span> <span class="n">HOUR</span><span class="p">),</span>
<span class="n">productId</span><span class="p">,</span>
<span class="k">COUNT</span><span class="p">(</span><span class="o">*</span><span class="p">),</span>
<span class="k">SUM</span><span class="p">(</span><span class="n">units</span><span class="p">)</span>
<span class="k">FROM</span> <span class="n">Orders</span>
<span class="k">GROUP</span> <span class="k">BY</span> <span class="n">TUMBLE</span><span class="p">(</span><span class="n">rowtime</span><span class="p">,</span> <span class="n">INTERVAL</span> <span class="s1">'1'</span> <span class="n">HOUR</span><span class="p">),</span> <span class="n">productId</span><span class="p">;</span>
<span class="k">SELECT</span> <span class="n">STREAM</span> <span class="n">rowtime</span><span class="p">,</span> <span class="n">productId</span>
<span class="k">FROM</span> <span class="n">HourlyOrderTotals</span>
<span class="k">WHERE</span> <span class="k">c</span> <span class="o">&gt;</span> <span class="mi">2</span> <span class="k">OR</span> <span class="n">su</span> <span class="o">&gt;</span> <span class="mi">10</span><span class="p">;</span>
<span class="n">rowtime</span> <span class="o">|</span> <span class="n">productId</span>
<span class="c1">----------+-----------</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">30</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">40</span></code></pre></figure>
<p>Sub-queries in the <code class="language-plaintext highlighter-rouge">FROM</code> clause are sometimes referred to as “inline views”,
but really, they are more fundamental than views. Views are just a convenient
way to carve your SQL into manageable chunks by giving the pieces names and
storing them in the metadata repository.</p>
<p>Many people find that nested queries and views are even more useful on streams
than they are on relations. Streaming queries are pipelines of
operators all running continuously, and often those pipelines get quite long.
Nested queries and views help to express and manage those pipelines.</p>
<p>And, by the way, a <code class="language-plaintext highlighter-rouge">WITH</code> clause can accomplish the same as a sub-query or
a view:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">WITH</span> <span class="n">HourlyOrderTotals</span> <span class="p">(</span><span class="n">rowtime</span><span class="p">,</span> <span class="n">productId</span><span class="p">,</span> <span class="k">c</span><span class="p">,</span> <span class="n">su</span><span class="p">)</span> <span class="k">AS</span> <span class="p">(</span>
<span class="k">SELECT</span> <span class="n">TUMBLE_END</span><span class="p">(</span><span class="n">rowtime</span><span class="p">,</span> <span class="n">INTERVAL</span> <span class="s1">'1'</span> <span class="n">HOUR</span><span class="p">),</span>
<span class="n">productId</span><span class="p">,</span>
<span class="k">COUNT</span><span class="p">(</span><span class="o">*</span><span class="p">),</span>
<span class="k">SUM</span><span class="p">(</span><span class="n">units</span><span class="p">)</span>
<span class="k">FROM</span> <span class="n">Orders</span>
<span class="k">GROUP</span> <span class="k">BY</span> <span class="n">TUMBLE</span><span class="p">(</span><span class="n">rowtime</span><span class="p">,</span> <span class="n">INTERVAL</span> <span class="s1">'1'</span> <span class="n">HOUR</span><span class="p">),</span> <span class="n">productId</span><span class="p">)</span>
<span class="k">SELECT</span> <span class="n">STREAM</span> <span class="n">rowtime</span><span class="p">,</span> <span class="n">productId</span>
<span class="k">FROM</span> <span class="n">HourlyOrderTotals</span>
<span class="k">WHERE</span> <span class="k">c</span> <span class="o">&gt;</span> <span class="mi">2</span> <span class="k">OR</span> <span class="n">su</span> <span class="o">&gt;</span> <span class="mi">10</span><span class="p">;</span>
<span class="n">rowtime</span> <span class="o">|</span> <span class="n">productId</span>
<span class="c1">----------+-----------</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">30</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">40</span></code></pre></figure>
<h1 id="converting-between-streams-and-relations">Converting between streams and relations</h1>
<p>Look back at the definition of the <code class="language-plaintext highlighter-rouge">HourlyOrderTotals</code> view.
Is the view a stream or a relation?</p>
<p>It does not contain the <code class="language-plaintext highlighter-rouge">STREAM</code> keyword, so it is a relation.
However, it is a relation that can be converted into a stream.</p>
<p>You can use it in both relational and streaming queries:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="o">#</span> <span class="n">A</span> <span class="n">relation</span><span class="p">;</span> <span class="n">will</span> <span class="n">query</span> <span class="n">the</span> <span class="n">historic</span> <span class="n">Orders</span> <span class="k">table</span><span class="p">.</span>
<span class="o">#</span> <span class="k">Returns</span> <span class="n">the</span> <span class="n">largest</span> <span class="n">number</span> <span class="k">of</span> <span class="n">product</span> <span class="o">#</span><span class="mi">10</span> <span class="n">ever</span> <span class="n">sold</span> <span class="k">in</span> <span class="n">one</span> <span class="n">hour</span><span class="p">.</span>
<span class="k">SELECT</span> <span class="k">max</span><span class="p">(</span><span class="n">su</span><span class="p">)</span>
<span class="k">FROM</span> <span class="n">HourlyOrderTotals</span>
<span class="k">WHERE</span> <span class="n">productId</span> <span class="o">=</span> <span class="mi">10</span><span class="p">;</span>
<span class="o">#</span> <span class="n">A</span> <span class="n">stream</span><span class="p">;</span> <span class="n">will</span> <span class="n">query</span> <span class="n">the</span> <span class="n">Orders</span> <span class="n">stream</span><span class="p">.</span>
<span class="o">#</span> <span class="k">Returns</span> <span class="k">every</span> <span class="n">hour</span> <span class="k">in</span> <span class="n">which</span> <span class="k">at</span> <span class="n">least</span> <span class="n">one</span> <span class="n">product</span> <span class="o">#</span><span class="mi">10</span> <span class="n">was</span> <span class="n">sold</span><span class="p">.</span>
<span class="k">SELECT</span> <span class="n">STREAM</span> <span class="n">rowtime</span>
<span class="k">FROM</span> <span class="n">HourlyOrderTotals</span>
<span class="k">WHERE</span> <span class="n">productId</span> <span class="o">=</span> <span class="mi">10</span><span class="p">;</span></code></pre></figure>
<p>This approach is not limited to views and sub-queries.
Following the approach set out in CQL [<a href="#ref1">1</a>], every query
in streaming SQL is defined as a relational query and converted to a stream
using the <code class="language-plaintext highlighter-rouge">STREAM</code> keyword in the top-most <code class="language-plaintext highlighter-rouge">SELECT</code>.</p>
<p>If the <code class="language-plaintext highlighter-rouge">STREAM</code> keyword is present in sub-queries or view definitions, it has no
effect.</p>
<p>At query preparation time, Calcite figures out whether the relations referenced
in the query can be converted to streams or historical relations.</p>
<p>Sometimes a stream makes available some of its history (say the last 24 hours of
data in an Apache Kafka [<a href="#ref2">2</a>] topic)
but not all. At run time, Calcite figures out whether there is sufficient
history to run the query, and if not, gives an error.</p>
<h1 id="the-pie-chart-problem-relational-queries-on-streams">The “pie chart” problem: Relational queries on streams</h1>
<p>One particular case where you need to convert a stream to a relation
occurs in what I call the “pie chart problem”. Imagine that you need to
write a web page with a chart, like the following, that summarizes the
number of orders for each product over the last hour.</p>
<p><img src="/img/pie-chart.png" alt="Pie chart"></p>
<p>But the <code class="language-plaintext highlighter-rouge">Orders</code> stream only contains a few records, not an hour’s summary.
We need to run a relational query on the history of the stream:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">productId</span><span class="p">,</span> <span class="k">count</span><span class="p">(</span><span class="o">*</span><span class="p">)</span>
<span class="k">FROM</span> <span class="n">Orders</span>
<span class="k">WHERE</span> <span class="n">rowtime</span> <span class="k">BETWEEN</span> <span class="k">current_timestamp</span> <span class="o">-</span> <span class="n">INTERVAL</span> <span class="s1">'1'</span> <span class="n">HOUR</span>
<span class="k">AND</span> <span class="k">current_timestamp</span><span class="p">;</span></code></pre></figure>
<p>If the history of the <code class="language-plaintext highlighter-rouge">Orders</code> stream is being spooled to the <code class="language-plaintext highlighter-rouge">Orders</code> table,
we can answer the query, albeit at a high cost. Better, if we can tell the
system to materialize one hour summary into a table,
maintain it continuously as the stream flows,
and automatically rewrite queries to use the table.</p>
<h1 id="sorting">Sorting</h1>
<p>The story for <code class="language-plaintext highlighter-rouge">ORDER BY</code> is similar to <code class="language-plaintext highlighter-rouge">GROUP BY</code>.
The syntax looks like regular SQL, but Calcite must be sure that it can deliver
timely results. It therefore requires a monotonic expression on the leading edge
of your <code class="language-plaintext highlighter-rouge">ORDER BY</code> key.</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">STREAM</span> <span class="n">CEIL</span><span class="p">(</span><span class="n">rowtime</span> <span class="k">TO</span> <span class="n">hour</span><span class="p">)</span> <span class="k">AS</span> <span class="n">rowtime</span><span class="p">,</span> <span class="n">productId</span><span class="p">,</span> <span class="n">orderId</span><span class="p">,</span> <span class="n">units</span>
<span class="k">FROM</span> <span class="n">Orders</span>
<span class="k">ORDER</span> <span class="k">BY</span> <span class="n">CEIL</span><span class="p">(</span><span class="n">rowtime</span> <span class="k">TO</span> <span class="n">hour</span><span class="p">)</span> <span class="k">ASC</span><span class="p">,</span> <span class="n">units</span> <span class="k">DESC</span><span class="p">;</span>
<span class="n">rowtime</span> <span class="o">|</span> <span class="n">productId</span> <span class="o">|</span> <span class="n">orderId</span> <span class="o">|</span> <span class="n">units</span>
<span class="c1">----------+-----------+---------+-------</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">30</span> <span class="o">|</span> <span class="mi">8</span> <span class="o">|</span> <span class="mi">20</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">30</span> <span class="o">|</span> <span class="mi">5</span> <span class="o">|</span> <span class="mi">4</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">20</span> <span class="o">|</span> <span class="mi">7</span> <span class="o">|</span> <span class="mi">2</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">6</span> <span class="o">|</span> <span class="mi">1</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">40</span> <span class="o">|</span> <span class="mi">11</span> <span class="o">|</span> <span class="mi">12</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">9</span> <span class="o">|</span> <span class="mi">6</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">12</span> <span class="o">|</span> <span class="mi">4</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">00</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">1</span></code></pre></figure>
<p>Most queries will return results in the order that they were inserted,
because the engine is using streaming algorithms, but you should not rely on it.
For example, consider this:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">STREAM</span> <span class="o">*</span>
<span class="k">FROM</span> <span class="n">Orders</span>
<span class="k">WHERE</span> <span class="n">productId</span> <span class="o">=</span> <span class="mi">10</span>
<span class="k">UNION</span> <span class="k">ALL</span>
<span class="k">SELECT</span> <span class="n">STREAM</span> <span class="o">*</span>
<span class="k">FROM</span> <span class="n">Orders</span>
<span class="k">WHERE</span> <span class="n">productId</span> <span class="o">=</span> <span class="mi">30</span><span class="p">;</span>
<span class="n">rowtime</span> <span class="o">|</span> <span class="n">productId</span> <span class="o">|</span> <span class="n">orderId</span> <span class="o">|</span> <span class="n">units</span>
<span class="c1">----------+-----------+---------+-------</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">17</span><span class="p">:</span><span class="mi">05</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">6</span> <span class="o">|</span> <span class="mi">1</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">17</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">30</span> <span class="o">|</span> <span class="mi">5</span> <span class="o">|</span> <span class="mi">4</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">18</span><span class="p">:</span><span class="mi">07</span> <span class="o">|</span> <span class="mi">30</span> <span class="o">|</span> <span class="mi">8</span> <span class="o">|</span> <span class="mi">20</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">02</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">9</span> <span class="o">|</span> <span class="mi">6</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">04</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">1</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">24</span><span class="p">:</span><span class="mi">11</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">12</span> <span class="o">|</span> <span class="mi">4</span></code></pre></figure>
<p>The rows with <code class="language-plaintext highlighter-rouge">productId</code> = 30 are apparently out of order, probably because
the <code class="language-plaintext highlighter-rouge">Orders</code> stream was partitioned on <code class="language-plaintext highlighter-rouge">productId</code> and the partitioned streams
sent their data at different times.</p>
<p>If you require a particular ordering, add an explicit <code class="language-plaintext highlighter-rouge">ORDER BY</code>:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">STREAM</span> <span class="o">*</span>
<span class="k">FROM</span> <span class="n">Orders</span>
<span class="k">WHERE</span> <span class="n">productId</span> <span class="o">=</span> <span class="mi">10</span>
<span class="k">UNION</span> <span class="k">ALL</span>
<span class="k">SELECT</span> <span class="n">STREAM</span> <span class="o">*</span>
<span class="k">FROM</span> <span class="n">Orders</span>
<span class="k">WHERE</span> <span class="n">productId</span> <span class="o">=</span> <span class="mi">30</span>
<span class="k">ORDER</span> <span class="k">BY</span> <span class="n">rowtime</span><span class="p">;</span>
<span class="n">rowtime</span> <span class="o">|</span> <span class="n">productId</span> <span class="o">|</span> <span class="n">orderId</span> <span class="o">|</span> <span class="n">units</span>
<span class="c1">----------+-----------+---------+-------</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">17</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">30</span> <span class="o">|</span> <span class="mi">5</span> <span class="o">|</span> <span class="mi">4</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">17</span><span class="p">:</span><span class="mi">05</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">6</span> <span class="o">|</span> <span class="mi">1</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">18</span><span class="p">:</span><span class="mi">07</span> <span class="o">|</span> <span class="mi">30</span> <span class="o">|</span> <span class="mi">8</span> <span class="o">|</span> <span class="mi">20</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">02</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">9</span> <span class="o">|</span> <span class="mi">6</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">04</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">1</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">24</span><span class="p">:</span><span class="mi">11</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">12</span> <span class="o">|</span> <span class="mi">4</span></code></pre></figure>
<p>Calcite will probably implement the <code class="language-plaintext highlighter-rouge">UNION ALL</code> by merging using <code class="language-plaintext highlighter-rouge">rowtime</code>,
which is only slightly less efficient.</p>
<p>You only need to add an <code class="language-plaintext highlighter-rouge">ORDER BY</code> to the outermost query. If you need to,
say, perform <code class="language-plaintext highlighter-rouge">GROUP BY</code> after a <code class="language-plaintext highlighter-rouge">UNION ALL</code>, Calcite will add an <code class="language-plaintext highlighter-rouge">ORDER BY</code>
implicitly, in order to make the GROUP BY algorithm possible.</p>
<h1 id="table-constructor">Table constructor</h1>
<p>The <code class="language-plaintext highlighter-rouge">VALUES</code> clause creates an inline table with a given set of rows.</p>
<p>Streaming is disallowed. The set of rows never changes, and therefore a stream
would never return any rows.</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="o">&gt;</span> <span class="k">SELECT</span> <span class="n">STREAM</span> <span class="o">*</span> <span class="k">FROM</span> <span class="p">(</span><span class="k">VALUES</span> <span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="s1">'abc'</span><span class="p">));</span>
<span class="n">ERROR</span><span class="p">:</span> <span class="n">Cannot</span> <span class="n">stream</span> <span class="k">VALUES</span></code></pre></figure>
<h1 id="sliding-windows">Sliding windows</h1>
<p>Standard SQL features so-called “analytic functions” that can be used in the
<code class="language-plaintext highlighter-rouge">SELECT</code> clause. Unlike <code class="language-plaintext highlighter-rouge">GROUP BY</code>, these do not collapse records. For each
record that goes in, one record comes out. But the aggregate function is based
on a window of many rows.</p>
<p>Let’s look at an example.</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">STREAM</span> <span class="n">rowtime</span><span class="p">,</span>
<span class="n">productId</span><span class="p">,</span>
<span class="n">units</span><span class="p">,</span>
<span class="k">SUM</span><span class="p">(</span><span class="n">units</span><span class="p">)</span> <span class="n">OVER</span> <span class="p">(</span><span class="k">ORDER</span> <span class="k">BY</span> <span class="n">rowtime</span> <span class="k">RANGE</span> <span class="n">INTERVAL</span> <span class="s1">'1'</span> <span class="n">HOUR</span> <span class="k">PRECEDING</span><span class="p">)</span> <span class="n">unitsLastHour</span>
<span class="k">FROM</span> <span class="n">Orders</span><span class="p">;</span></code></pre></figure>
<p>The feature packs a lot of power with little effort. You can have multiple
functions in the <code class="language-plaintext highlighter-rouge">SELECT</code> clause, based on multiple window specifications.</p>
<p>The following example returns orders whose average order size over the last
10 minutes is greater than the average order size for the last week.</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">STREAM</span> <span class="o">*</span>
<span class="k">FROM</span> <span class="p">(</span>
<span class="k">SELECT</span> <span class="n">STREAM</span> <span class="n">rowtime</span><span class="p">,</span>
<span class="n">productId</span><span class="p">,</span>
<span class="n">units</span><span class="p">,</span>
<span class="k">AVG</span><span class="p">(</span><span class="n">units</span><span class="p">)</span> <span class="n">OVER</span> <span class="n">product</span> <span class="p">(</span><span class="k">RANGE</span> <span class="n">INTERVAL</span> <span class="s1">'10'</span> <span class="k">MINUTE</span> <span class="k">PRECEDING</span><span class="p">)</span> <span class="k">AS</span> <span class="n">m10</span><span class="p">,</span>
<span class="k">AVG</span><span class="p">(</span><span class="n">units</span><span class="p">)</span> <span class="n">OVER</span> <span class="n">product</span> <span class="p">(</span><span class="k">RANGE</span> <span class="n">INTERVAL</span> <span class="s1">'7'</span> <span class="k">DAY</span> <span class="k">PRECEDING</span><span class="p">)</span> <span class="k">AS</span> <span class="n">d7</span>
<span class="k">FROM</span> <span class="n">Orders</span>
<span class="k">WINDOW</span> <span class="n">product</span> <span class="k">AS</span> <span class="p">(</span>
<span class="k">ORDER</span> <span class="k">BY</span> <span class="n">rowtime</span>
<span class="k">PARTITION</span> <span class="k">BY</span> <span class="n">productId</span><span class="p">))</span>
<span class="k">WHERE</span> <span class="n">m10</span> <span class="o">&gt;</span> <span class="n">d7</span><span class="p">;</span></code></pre></figure>
<p>For conciseness, here we use a syntax where you partially define a window
using a <code class="language-plaintext highlighter-rouge">WINDOW</code> clause and then refine the window in each <code class="language-plaintext highlighter-rouge">OVER</code> clause.
You could also define all windows in the <code class="language-plaintext highlighter-rouge">WINDOW</code> clause, or all windows inline,
if you wish.</p>
<p>But the real power goes beyond syntax. Behind the scenes, this query is
maintaining two tables, and adding and removing values from sub-totals using
with FIFO queues. But you can access those tables without introducing a join
into the query.</p>
<p>Some other features of the windowed aggregation syntax:</p>
<ul>
<li>You can define windows based on row count.</li>
<li>The window can reference rows that have not yet arrived.
(The stream will wait until they have arrived).</li>
<li>You can compute order-dependent functions such as <code class="language-plaintext highlighter-rouge">RANK</code> and median.</li>
</ul>
<h1 id="cascading-windows">Cascading windows</h1>
<p>What if we want a query that returns a result for every record, like a
sliding window, but resets totals on a fixed time period, like a
tumbling window? Such a pattern is called a <em>cascading window</em>. Here
is an example:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">STREAM</span> <span class="n">rowtime</span><span class="p">,</span>
<span class="n">productId</span><span class="p">,</span>
<span class="n">units</span><span class="p">,</span>
<span class="k">SUM</span><span class="p">(</span><span class="n">units</span><span class="p">)</span> <span class="n">OVER</span> <span class="p">(</span><span class="k">PARTITION</span> <span class="k">BY</span> <span class="n">FLOOR</span><span class="p">(</span><span class="n">rowtime</span> <span class="k">TO</span> <span class="n">HOUR</span><span class="p">))</span> <span class="k">AS</span> <span class="n">unitsSinceTopOfHour</span>
<span class="k">FROM</span> <span class="n">Orders</span><span class="p">;</span></code></pre></figure>
<p>It looks similar to a sliding window query, but the monotonic
expression occurs within the <code class="language-plaintext highlighter-rouge">PARTITION BY</code> clause of the window. As
the rowtime moves from from 10:59:59 to 11:00:00,
<code class="language-plaintext highlighter-rouge">FLOOR(rowtime TO HOUR)</code> changes from 10:00:00 to 11:00:00,
and therefore a new partition starts.
The first row to arrive in the new hour will start a
new total; the second row will have a total that consists of two rows,
and so on.</p>
<p>Calcite knows that the old partition will never be used again, so
removes all sub-totals for that partition from its internal storage.</p>
<p>Analytic functions that using cascading and sliding windows can be
combined in the same query.</p>
<h1 id="joining-streams-to-tables">Joining streams to tables</h1>
<p>There are two kinds of join where streams are concerned: stream-to-table
join and stream-to-stream join.</p>
<p>A stream-to-table join is straightforward if the contents of the table
are not changing. This query enriches a stream of orders with
each product’s list price:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">STREAM</span> <span class="n">o</span><span class="p">.</span><span class="n">rowtime</span><span class="p">,</span> <span class="n">o</span><span class="p">.</span><span class="n">productId</span><span class="p">,</span> <span class="n">o</span><span class="p">.</span><span class="n">orderId</span><span class="p">,</span> <span class="n">o</span><span class="p">.</span><span class="n">units</span><span class="p">,</span>
<span class="n">p</span><span class="p">.</span><span class="n">name</span><span class="p">,</span> <span class="n">p</span><span class="p">.</span><span class="n">unitPrice</span>
<span class="k">FROM</span> <span class="n">Orders</span> <span class="k">AS</span> <span class="n">o</span>
<span class="k">JOIN</span> <span class="n">Products</span> <span class="k">AS</span> <span class="n">p</span>
<span class="k">ON</span> <span class="n">o</span><span class="p">.</span><span class="n">productId</span> <span class="o">=</span> <span class="n">p</span><span class="p">.</span><span class="n">productId</span><span class="p">;</span>
<span class="n">rowtime</span> <span class="o">|</span> <span class="n">productId</span> <span class="o">|</span> <span class="n">orderId</span> <span class="o">|</span> <span class="n">units</span> <span class="o">|</span> <span class="n">name</span> <span class="o">|</span> <span class="n">unitPrice</span>
<span class="c1">----------+-----------+---------+-------+ -------+-----------</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">17</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">30</span> <span class="o">|</span> <span class="mi">5</span> <span class="o">|</span> <span class="mi">4</span> <span class="o">|</span> <span class="n">Cheese</span> <span class="o">|</span> <span class="mi">17</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">17</span><span class="p">:</span><span class="mi">05</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">6</span> <span class="o">|</span> <span class="mi">1</span> <span class="o">|</span> <span class="n">Beer</span> <span class="o">|</span> <span class="mi">0</span><span class="p">.</span><span class="mi">25</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">18</span><span class="p">:</span><span class="mi">05</span> <span class="o">|</span> <span class="mi">20</span> <span class="o">|</span> <span class="mi">7</span> <span class="o">|</span> <span class="mi">2</span> <span class="o">|</span> <span class="n">Wine</span> <span class="o">|</span> <span class="mi">6</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">18</span><span class="p">:</span><span class="mi">07</span> <span class="o">|</span> <span class="mi">30</span> <span class="o">|</span> <span class="mi">8</span> <span class="o">|</span> <span class="mi">20</span> <span class="o">|</span> <span class="n">Cheese</span> <span class="o">|</span> <span class="mi">17</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">02</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">9</span> <span class="o">|</span> <span class="mi">6</span> <span class="o">|</span> <span class="n">Beer</span> <span class="o">|</span> <span class="mi">0</span><span class="p">.</span><span class="mi">25</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">04</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">1</span> <span class="o">|</span> <span class="n">Beer</span> <span class="o">|</span> <span class="mi">0</span><span class="p">.</span><span class="mi">25</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">09</span><span class="p">:</span><span class="mi">30</span> <span class="o">|</span> <span class="mi">40</span> <span class="o">|</span> <span class="mi">11</span> <span class="o">|</span> <span class="mi">12</span> <span class="o">|</span> <span class="n">Bread</span> <span class="o">|</span> <span class="mi">100</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">24</span><span class="p">:</span><span class="mi">11</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">12</span> <span class="o">|</span> <span class="mi">4</span> <span class="o">|</span> <span class="n">Beer</span> <span class="o">|</span> <span class="mi">0</span><span class="p">.</span><span class="mi">25</span></code></pre></figure>
<p>What should happen if the table is changing? For example,
suppose the unit price of product 10 is increased to 0.35 at 11:00.
Orders placed before 11:00 should have the old price, and orders
placed after 11:00 should reflect the new price.</p>
<p>One way to implement this is to have a table that keeps every version
with a start and end effective date, <code class="language-plaintext highlighter-rouge">ProductVersions</code> in the following
example:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">STREAM</span> <span class="o">*</span>
<span class="k">FROM</span> <span class="n">Orders</span> <span class="k">AS</span> <span class="n">o</span>
<span class="k">JOIN</span> <span class="n">ProductVersions</span> <span class="k">AS</span> <span class="n">p</span>
<span class="k">ON</span> <span class="n">o</span><span class="p">.</span><span class="n">productId</span> <span class="o">=</span> <span class="n">p</span><span class="p">.</span><span class="n">productId</span>
<span class="k">AND</span> <span class="n">o</span><span class="p">.</span><span class="n">rowtime</span> <span class="k">BETWEEN</span> <span class="n">p</span><span class="p">.</span><span class="n">startDate</span> <span class="k">AND</span> <span class="n">p</span><span class="p">.</span><span class="n">endDate</span>
<span class="n">rowtime</span> <span class="o">|</span> <span class="n">productId</span> <span class="o">|</span> <span class="n">orderId</span> <span class="o">|</span> <span class="n">units</span> <span class="o">|</span> <span class="n">productId1</span> <span class="o">|</span> <span class="n">name</span> <span class="o">|</span> <span class="n">unitPrice</span>
<span class="c1">----------+-----------+---------+-------+ -----------+--------+-----------</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">17</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">30</span> <span class="o">|</span> <span class="mi">5</span> <span class="o">|</span> <span class="mi">4</span> <span class="o">|</span> <span class="mi">30</span> <span class="o">|</span> <span class="n">Cheese</span> <span class="o">|</span> <span class="mi">17</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">17</span><span class="p">:</span><span class="mi">05</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">6</span> <span class="o">|</span> <span class="mi">1</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="n">Beer</span> <span class="o">|</span> <span class="mi">0</span><span class="p">.</span><span class="mi">25</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">18</span><span class="p">:</span><span class="mi">05</span> <span class="o">|</span> <span class="mi">20</span> <span class="o">|</span> <span class="mi">7</span> <span class="o">|</span> <span class="mi">2</span> <span class="o">|</span> <span class="mi">20</span> <span class="o">|</span> <span class="n">Wine</span> <span class="o">|</span> <span class="mi">6</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">18</span><span class="p">:</span><span class="mi">07</span> <span class="o">|</span> <span class="mi">30</span> <span class="o">|</span> <span class="mi">8</span> <span class="o">|</span> <span class="mi">20</span> <span class="o">|</span> <span class="mi">30</span> <span class="o">|</span> <span class="n">Cheese</span> <span class="o">|</span> <span class="mi">17</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">02</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">9</span> <span class="o">|</span> <span class="mi">6</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="n">Beer</span> <span class="o">|</span> <span class="mi">0</span><span class="p">.</span><span class="mi">35</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">04</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">1</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="n">Beer</span> <span class="o">|</span> <span class="mi">0</span><span class="p">.</span><span class="mi">35</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">09</span><span class="p">:</span><span class="mi">30</span> <span class="o">|</span> <span class="mi">40</span> <span class="o">|</span> <span class="mi">11</span> <span class="o">|</span> <span class="mi">12</span> <span class="o">|</span> <span class="mi">40</span> <span class="o">|</span> <span class="n">Bread</span> <span class="o">|</span> <span class="mi">100</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">24</span><span class="p">:</span><span class="mi">11</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">12</span> <span class="o">|</span> <span class="mi">4</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="n">Beer</span> <span class="o">|</span> <span class="mi">0</span><span class="p">.</span><span class="mi">35</span></code></pre></figure>
<p>The other way to implement this is to use a database with temporal support
(the ability to find the contents of the database as it was at any moment
in the past), and the system needs to know that the <code class="language-plaintext highlighter-rouge">rowtime</code> column of
the <code class="language-plaintext highlighter-rouge">Orders</code> stream corresponds to the transaction timestamp of the
<code class="language-plaintext highlighter-rouge">Products</code> table.</p>
<p>For many applications, it is not worth the cost and effort of temporal
support or a versioned table. It is acceptable to the application that
the query gives different results when replayed: in this example, on replay,
all orders of product 10 are assigned the later unit price, 0.35.</p>
<h1 id="joining-streams-to-streams">Joining streams to streams</h1>
<p>It makes sense to join two streams if the join condition somehow forces
them to remain a finite distance from one another. In the following query,
the ship date is within one hour of the order date:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">SELECT</span> <span class="n">STREAM</span> <span class="n">o</span><span class="p">.</span><span class="n">rowtime</span><span class="p">,</span> <span class="n">o</span><span class="p">.</span><span class="n">productId</span><span class="p">,</span> <span class="n">o</span><span class="p">.</span><span class="n">orderId</span><span class="p">,</span> <span class="n">s</span><span class="p">.</span><span class="n">rowtime</span> <span class="k">AS</span> <span class="n">shipTime</span>
<span class="k">FROM</span> <span class="n">Orders</span> <span class="k">AS</span> <span class="n">o</span>
<span class="k">JOIN</span> <span class="n">Shipments</span> <span class="k">AS</span> <span class="n">s</span>
<span class="k">ON</span> <span class="n">o</span><span class="p">.</span><span class="n">orderId</span> <span class="o">=</span> <span class="n">s</span><span class="p">.</span><span class="n">orderId</span>
<span class="k">AND</span> <span class="n">s</span><span class="p">.</span><span class="n">rowtime</span> <span class="k">BETWEEN</span> <span class="n">o</span><span class="p">.</span><span class="n">rowtime</span> <span class="k">AND</span> <span class="n">o</span><span class="p">.</span><span class="n">rowtime</span> <span class="o">+</span> <span class="n">INTERVAL</span> <span class="s1">'1'</span> <span class="n">HOUR</span><span class="p">;</span>
<span class="n">rowtime</span> <span class="o">|</span> <span class="n">productId</span> <span class="o">|</span> <span class="n">orderId</span> <span class="o">|</span> <span class="n">shipTime</span>
<span class="c1">----------+-----------+---------+----------</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">17</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">30</span> <span class="o">|</span> <span class="mi">5</span> <span class="o">|</span> <span class="mi">10</span><span class="p">:</span><span class="mi">55</span><span class="p">:</span><span class="mi">00</span>
<span class="mi">10</span><span class="p">:</span><span class="mi">17</span><span class="p">:</span><span class="mi">05</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">6</span> <span class="o">|</span> <span class="mi">10</span><span class="p">:</span><span class="mi">20</span><span class="p">:</span><span class="mi">00</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">02</span><span class="p">:</span><span class="mi">00</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">9</span> <span class="o">|</span> <span class="mi">11</span><span class="p">:</span><span class="mi">58</span><span class="p">:</span><span class="mi">00</span>
<span class="mi">11</span><span class="p">:</span><span class="mi">24</span><span class="p">:</span><span class="mi">11</span> <span class="o">|</span> <span class="mi">10</span> <span class="o">|</span> <span class="mi">12</span> <span class="o">|</span> <span class="mi">11</span><span class="p">:</span><span class="mi">44</span><span class="p">:</span><span class="mi">00</span></code></pre></figure>
<p>Note that quite a few orders do not appear, because they did not ship
within an hour. By the time the system receives order 10, timestamped 11:24:11,
it has already removed orders up to and including order 8, timestamped 10:18:07,
from its hash table.</p>
<p>As you can see, the “lock step”, tying together monotonic or quasi-monotonic
columns of the two streams, is necessary for the system to make progress.
It will refuse to execute a query if it cannot deduce a lock step.</p>
<h1 id="dml">DML</h1>
<p>It’s not only queries that make sense against streams;
it also makes sense to run DML statements (<code class="language-plaintext highlighter-rouge">INSERT</code>, <code class="language-plaintext highlighter-rouge">UPDATE</code>, <code class="language-plaintext highlighter-rouge">DELETE</code>,
and also their rarer cousins <code class="language-plaintext highlighter-rouge">UPSERT</code> and <code class="language-plaintext highlighter-rouge">REPLACE</code>) against streams.</p>
<p>DML is useful because it allows you do materialize streams
or tables based on streams,
and therefore save effort when values are used often.</p>
<p>Consider how streaming applications often consist of pipelines of queries,
each query transforming input stream(s) to output stream(s).
The component of a pipeline can be a view:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">CREATE</span> <span class="k">VIEW</span> <span class="n">LargeOrders</span> <span class="k">AS</span>
<span class="k">SELECT</span> <span class="n">STREAM</span> <span class="o">*</span> <span class="k">FROM</span> <span class="n">Orders</span> <span class="k">WHERE</span> <span class="n">units</span> <span class="o">&gt;</span> <span class="mi">1000</span><span class="p">;</span></code></pre></figure>
<p>or a standing <code class="language-plaintext highlighter-rouge">INSERT</code> statement:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="k">INSERT</span> <span class="k">INTO</span> <span class="n">LargeOrders</span>
<span class="k">SELECT</span> <span class="n">STREAM</span> <span class="o">*</span> <span class="k">FROM</span> <span class="n">Orders</span> <span class="k">WHERE</span> <span class="n">units</span> <span class="o">&gt;</span> <span class="mi">1000</span><span class="p">;</span></code></pre></figure>
<p>These look similar, and in both cases the next step(s) in the pipeline
can read from <code class="language-plaintext highlighter-rouge">LargeOrders</code> without worrying how it was populated.
There is a difference in efficiency: the <code class="language-plaintext highlighter-rouge">INSERT</code> statement does the
same work no matter how many consumers there are; the view does work
proportional to the number of consumers, and in particular, does no
work if there are no consumers.</p>
<p>Other forms of DML make sense for streams. For example, the following
standing <code class="language-plaintext highlighter-rouge">UPSERT</code> statement maintains a table that materializes a summary
of the last hour of orders:</p>
<figure class="highlight"><pre><code class="language-sql" data-lang="sql"><span class="n">UPSERT</span> <span class="k">INTO</span> <span class="n">OrdersSummary</span>
<span class="k">SELECT</span> <span class="n">STREAM</span> <span class="n">productId</span><span class="p">,</span>
<span class="k">COUNT</span><span class="p">(</span><span class="o">*</span><span class="p">)</span> <span class="n">OVER</span> <span class="n">lastHour</span> <span class="k">AS</span> <span class="k">c</span>
<span class="k">FROM</span> <span class="n">Orders</span>
<span class="k">WINDOW</span> <span class="n">lastHour</span> <span class="k">AS</span> <span class="p">(</span>
<span class="k">PARTITION</span> <span class="k">BY</span> <span class="n">productId</span>
<span class="k">ORDER</span> <span class="k">BY</span> <span class="n">rowtime</span>
<span class="k">RANGE</span> <span class="n">INTERVAL</span> <span class="s1">'1'</span> <span class="n">HOUR</span> <span class="k">PRECEDING</span><span class="p">)</span></code></pre></figure>
<h1 id="punctuation">Punctuation</h1>
<p>Punctuation[<a href="#ref5">5</a>] allows a stream query to make progress
even if there are not enough values in a monotonic key to push the results out.</p>
<p>(I prefer the term “rowtime bounds”,
and watermarks[<a href="#ref6">6</a>] are a related concept,
but for these purposes, punctuation will suffice.)</p>
<p>If a stream has punctuation enabled then it may not be sorted but is
nevertheless sortable. So, for the purposes of semantics, it is sufficient
to work in terms of sorted streams.</p>
<p>By the way, an out-of-order stream is also sortable if it is <em>t-sorted</em>
(i.e. every record is guaranteed to arrive within <em>t</em> seconds of its
timestamp) or <em>k-sorted</em> (i.e. every record is guaranteed to be no more
than <em>k</em> positions out of order). So queries on these streams can be
planned similarly to queries on streams with punctuation.</p>
<p>And, we often want to aggregate over attributes that are not
time-based but are nevertheless monotonic. “The number of times a team
has shifted between winning-state and losing-state” is one such
monotonic attribute. The system needs to figure out for itself that it
is safe to aggregate over such an attribute; punctuation does not add
any extra information.</p>
<p>I have in mind some metadata (cost metrics) for the planner:</p>
<ol>
<li>Is this stream sorted on a given attribute (or attributes)?</li>
<li>Is it possible to sort the stream on a given attribute? (For finite
relations, the answer is always “yes”; for streams it depends on the
existence of punctuation, or linkage between the attributes and the
sort key.)</li>
<li>What latency do we need to introduce in order to perform that sort?</li>
<li>What is the cost (in CPU, memory etc.) of performing that sort?</li>
</ol>
<p>We already have (1), in <a href="/javadocAggregate/org/apache/calcite/rel/metadata/BuiltInMetadata.Collation.html">BuiltInMetadata.Collation</a>.
For (2), the answer is always “true” for finite relations.
But we’ll need to implement (2), (3) and (4) for streams.</p>
<h1 id="state-of-the-stream">State of the stream</h1>
<p>Not all concepts in this article have been implemented in Calcite.
And others may be implemented in Calcite but not in a particular adapter
such as SamzaSQL [<a href="#ref3">3</a>] [<a href="#ref4">4</a>].</p>
<h2 id="implemented">Implemented</h2>
<ul>
<li>Streaming <code class="language-plaintext highlighter-rouge">SELECT</code>, <code class="language-plaintext highlighter-rouge">WHERE</code>, <code class="language-plaintext highlighter-rouge">GROUP BY</code>, <code class="language-plaintext highlighter-rouge">HAVING</code>, <code class="language-plaintext highlighter-rouge">UNION ALL</code>, <code class="language-plaintext highlighter-rouge">ORDER BY</code>
</li>
<li>
<code class="language-plaintext highlighter-rouge">FLOOR</code> and <code class="language-plaintext highlighter-rouge">CEIL</code> functions</li>
<li>Monotonicity</li>
<li>Streaming <code class="language-plaintext highlighter-rouge">VALUES</code> is disallowed</li>
</ul>
<h2 id="not-implemented">Not implemented</h2>
<p>The following features are presented in this document as if Calcite
supports them, but in fact it does not (yet). Full support means
that the reference implementation supports the feature (including
negative cases) and the TCK tests it.</p>
<ul>
<li>Stream-to-stream <code class="language-plaintext highlighter-rouge">JOIN</code>
</li>
<li>Stream-to-table <code class="language-plaintext highlighter-rouge">JOIN</code>
</li>
<li>Stream on view</li>
<li>Streaming <code class="language-plaintext highlighter-rouge">UNION ALL</code> with <code class="language-plaintext highlighter-rouge">ORDER BY</code> (merge)</li>
<li>Relational query on stream</li>
<li>Streaming windowed aggregation (sliding and cascading windows)</li>
<li>Check that <code class="language-plaintext highlighter-rouge">STREAM</code> in sub-queries and views is ignored</li>
<li>Check that streaming <code class="language-plaintext highlighter-rouge">ORDER BY</code> cannot have <code class="language-plaintext highlighter-rouge">OFFSET</code> or <code class="language-plaintext highlighter-rouge">LIMIT</code>
</li>
<li>Limited history; at run time, check that there is sufficient history
to run the query.</li>
<li><a href="https://issues.apache.org/jira/browse/CALCITE-1096">Quasi-monotonicity</a></li>
<li>
<code class="language-plaintext highlighter-rouge">HOP</code> and <code class="language-plaintext highlighter-rouge">TUMBLE</code> (and auxiliary <code class="language-plaintext highlighter-rouge">HOP_START</code>, <code class="language-plaintext highlighter-rouge">HOP_END</code>,
<code class="language-plaintext highlighter-rouge">TUMBLE_START</code>, <code class="language-plaintext highlighter-rouge">TUMBLE_END</code>) functions</li>
</ul>
<h2 id="to-do-in-this-document">To do in this document</h2>
<ul>
<li>Re-visit whether you can stream <code class="language-plaintext highlighter-rouge">VALUES</code>
</li>
<li>
<code class="language-plaintext highlighter-rouge">OVER</code> clause to define window on stream</li>
<li>Consider whether to allow <code class="language-plaintext highlighter-rouge">CUBE</code> and <code class="language-plaintext highlighter-rouge">ROLLUP</code> in streaming queries,
with an understanding that some levels of aggregation will never complete
(because they have no monotonic expressions) and thus will never be emitted.</li>
<li>Fix the <code class="language-plaintext highlighter-rouge">UPSERT</code> example to remove records for products that have not
occurred in the last hour.</li>
<li>DML that outputs to multiple streams; perhaps an extension to the standard
<code class="language-plaintext highlighter-rouge">REPLACE</code> statement.</li>
</ul>
<h1 id="functions">Functions</h1>
<p>The following functions are not present in standard SQL
but are defined in streaming SQL.</p>
<p>Scalar functions:</p>
<ul>
<li>
<code class="language-plaintext highlighter-rouge">FLOOR(dateTime TO intervalType)</code> rounds a date, time or timestamp value
down to a given interval type</li>
<li>
<code class="language-plaintext highlighter-rouge">CEIL(dateTime TO intervalType)</code> rounds a date, time or timestamp value
up to a given interval type</li>
</ul>
<p>Partitioning functions:</p>
<ul>
<li>
<code class="language-plaintext highlighter-rouge">HOP(t, emit, retain)</code> returns a collection of group keys for a row
to be part of a hopping window</li>
<li>
<code class="language-plaintext highlighter-rouge">HOP(t, emit, retain, align)</code> returns a collection of group keys for a row
to be part of a hopping window with a given alignment</li>
<li>
<code class="language-plaintext highlighter-rouge">TUMBLE(t, emit)</code> returns a group key for a row
to be part of a tumbling window</li>
<li>
<code class="language-plaintext highlighter-rouge">TUMBLE(t, emit, align)</code> returns a group key for a row
to be part of a tumbling window with a given alignment</li>
</ul>
<p><code class="language-plaintext highlighter-rouge">TUMBLE(t, e)</code> is equivalent to <code class="language-plaintext highlighter-rouge">TUMBLE(t, e, TIME '00:00:00')</code>.</p>
<p><code class="language-plaintext highlighter-rouge">TUMBLE(t, e, a)</code> is equivalent to <code class="language-plaintext highlighter-rouge">HOP(t, e, e, a)</code>.</p>
<p><code class="language-plaintext highlighter-rouge">HOP(t, e, r)</code> is equivalent to <code class="language-plaintext highlighter-rouge">HOP(t, e, r, TIME '00:00:00')</code>.</p>
<h1 id="references">References</h1>
<ul>
<li>[<a name="ref1">1</a>]
<a href="https://ilpubs.stanford.edu:8090/758/">Arvind Arasu, Shivnath Babu,
and Jennifer Widom (2003) The CQL Continuous Query
Language: Semantic Foundations and Query Execution</a>.</li>
<li>[<a name="ref2">2</a>]
<a href="https://kafka.apache.org/documentation.html">Apache Kafka</a>.</li>
<li>[<a name="ref3">3</a>] <a href="https://samza.apache.org">Apache Samza</a>.</li>
<li>[<a name="ref4">4</a>] <a href="https://github.com/milinda/samza-sql">SamzaSQL</a>.</li>
<li>[<a name="ref5">5</a>]
<a href="https://www.whitworth.edu/academic/department/mathcomputerscience/faculty/tuckerpeter/pdf/117896_final.pdf">Peter
A. Tucker, David Maier, Tim Sheard, and Leonidas Fegaras (2003) Exploiting
Punctuation Semantics in Continuous Data Streams</a>.</li>
<li>[<a name="ref6">6</a>]
<a href="https://research.google.com/pubs/pub41378.html">Tyler Akidau,
Alex Balikov, Kaya Bekiroglu, Slava Chernyak, Josh Haberman, Reuven Lax,
Sam McVeety, Daniel Mills, Paul Nordstrom, and Sam Whittle (2013)
MillWheel: Fault-Tolerant Stream Processing at Internet Scale</a>.</li>
</ul>
<div class="section-nav">
<div class="left align-right">
<a href="/docs/spatial.html" class="prev">Previous</a>
</div>
<div class="right align-left">
<a href="/docs/materialized_views.html" class="next">Next</a>
</div>
</div>
<div class="clear"></div>
</article>
</div>
<div class="unit one-fifth hide-on-mobiles">
<aside>
<h4>Overview</h4>
<ul>
<li class=""><a href="/docs/index.html">Background</a></li>
<li class=""><a href="/docs/tutorial.html">Tutorial</a></li>
<li class=""><a href="/docs/algebra.html">Algebra</a></li>
</ul>
<h4>Advanced</h4>
<ul>
<li class=""><a href="/docs/adapter.html">Adapters</a></li>
<li class=""><a href="/docs/spatial.html">Spatial</a></li>
<li class="current"><a href="/docs/stream.html">Streaming</a></li>
<li class=""><a href="/docs/materialized_views.html">Materialized Views</a></li>
<li class=""><a href="/docs/lattice.html">Lattices</a></li>
</ul>
<h4>Avatica</h4>
<ul>
<li class=""><a href="/docs/avatica_overview.html">Overview</a></li>
<li class=""><a href="/docs/avatica_roadmap.html">Roadmap</a></li>
<li class=""><a href="/docs/avatica_json_reference.html">JSON Reference</a></li>
<li class=""><a href="/docs/avatica_protobuf_reference.html">Protobuf Reference</a></li>
</ul>
<h4>Reference</h4>
<ul>
<li class=""><a href="/docs/reference.html">SQL language</a></li>
<li class=""><a href="/docs/model.html">JSON/YAML models</a></li>
<li class=""><a href="/docs/howto.html">HOWTO</a></li>
</ul>
<h4>Meta</h4>
<ul>
<li class=""><a href="/docs/history.html">History</a></li>
<li class=""><a href="/docs/powered_by.html">Powered by Calcite</a></li>
<li class=""><a href="/javadocAggregate">API</a></li>
</ul>
</aside>
</div>
<div class="clear"></div>
</div>
</section>
<footer role="contentinfo">
<div id="poweredby">
<a href="http://www.apache.org/">
<span class="sr-only">Apache</span>
<img src="/img/feather.png" width="190" height="77" alt="Apache Logo"></a>
</div>
<div id="copyright">
<p>The contents of this website are Copyright © 2023
<a href="https://www.apache.org/">Apache Software Foundation</a>
under the terms of
the <a href="https://www.apache.org/licenses/">
Apache License v2</a>. Apache Calcite and its logo are
trademarks of the Apache Software Foundation.
</p>
<p>
<a href="https://privacy.apache.org/policies/privacy-policy-public.html">Privacy Policy</a>
</p>
</div>
</footer>
<script>
var anchorForId = function (id) {
var anchor = document.createElement("a");
anchor.className = "header-link";
anchor.href = "#" + id;
anchor.innerHTML = "<span class=\"sr-only\">Permalink</span><i class=\"fa fa-link\"></i>";
anchor.title = "Permalink";
return anchor;
};
var linkifyAnchors = function (level, containingElement) {
var headers = containingElement.getElementsByTagName("h" + level);
for (var h = 0; h < headers.length; h++) {
var header = headers[h];
if (typeof header.id !== "undefined" && header.id !== "") {
header.appendChild(anchorForId(header.id));
}
}
};
document.onreadystatechange = function () {
if (this.readyState === "complete") {
var contentBlock = document.getElementsByClassName("docs")[0] || document.getElementsByClassName("news")[0];
if (!contentBlock) {
return;
}
for (var level = 1; level <= 6; level++) {
linkifyAnchors(level, contentBlock);
}
}
};
</script>
</body>
</html>