title: “SQL” nav-parent_id: tableapi nav-pos: 30

The SQL language includes Data Definition Language (DDL), query and Data Manipulation Language (DML). Flink has a preliminary supports for DDL, query and DML features.

Before the existence of the SQL CLI, queries written in the Flink SQL can only be embedded in a table program written in Java or Scala. The tables accessed in the SQL query must be registered in the TableEnvironment first. It's impossible to use SQL only to complete the work.

The SQL CLI is designed to provide an easy way to write, debug, and submit table programs without writing a single line of Java or Scala code. For SQL CLI, DDL replaces the table definition and registration process in the table programs. By passing a SQL DDL description text to the SQL CLI, it will be parsed into table objects and registered to the TableEnvironment, then follow up SQL queries can access these tables directly.

SQL queries are specified with the sqlQuery() method of the TableEnvironment. The method returns the result of the SQL query as a Table. A Table can be used in subsequent SQL and Table API queries, be converted into a DataSet or DataStream, or written to a TableSink). SQL and Table API queries can be seamlessly mixed and are holistically optimized and translated into a single program.

In order to access a table in a SQL query, it must be registered in the TableEnvironment. A table can be registered from a TableSource, Table, DataStream, or DataSet. Alternatively, users can also register catalogs in a TableEnvironment to specify the location of the data sources.

For convenience, Table.toString() automatically registers the table under a unique name in its TableEnvironment and returns the name. Hence, Table objects can be directly inlined into SQL queries (by string concatenation) as shown in the examples below.

Note: The current DDL is not persistent and can not be shared, only exists in a single SQL query (along with the life cycle of a query), so currently only CREATE operations are supported (ALTER/DROP is not introduced). Later versions will support persistence and the corresponding DDL objects will be saved into persistent catalogs and are easier to use or modify (does not need to be declared every time, and can be shared). For example, user A produced a table T1 in a query, then T1 can be shared as a data source in another query of user B. Flink's SQL support is not yet feature complete. Queries that include unsupported SQL features cause a TableException. The supported features of SQL on batch and streaming tables are listed in the following sections.

  • This will be replaced by the TOC {:toc}

Creating a Table

The following example shows how to create a table via DDL. More details see [Supported DDL]({{ site.baseurl }}/dev/table/supported_ddl.html).

{% highlight sql %}

-- Create a table named Orders which includes a primary key, and is stored as a CSV file CREATE TABLE Orders ( orderId BIGINT NOT NULL, customId VARCHAR NOT NULL, itemId BIGINT NOT NULL, totalPrice BIGINT NOT NULL, orderTime TIMESTAMP NOT NULL, description VARCHAR, PRIMARY KEY(orderId) ) WITH ( type=‘csv’, path=‘file://abc/csv_file1’ )

{% endhighlight %}

Creating a View

The following example shows how to create a view via DDL.

{% highlight sql %}

-- The view OrderItemStats_2018 stats all items' order count in the year 2018. CREATE VIEW BigOrders AS SELECT itemId, count(*) AS orderCount, sum(totalPrice) AS totalSale FROM Orders WHERE orderTime BETWEEN ‘2018-01-01 00:00:00’ AND ‘2018-12-31 23:59:59’ GROUP BY itemId

{% endhighlight %}

Creating a Function

The following example shows how to create a function via DDL.

{% highlight sql %}

-- The function myConcat references to the scalar function class a.b.c.MyConcatScalarFunc which is written in Java and it's jar file is included in the compilation class path. CREATE FUNCTION myConcat AS ‘a.b.c.MyConcatScalarFunc’

{% endhighlight %}

Supported DDL Syntax

The following BNF-grammar describes the superset of supported DDL features in batch and streaming SQL.

CREATE TABLE

{% highlight sql %}

createTable: CREATE TABLE tablename ‘(’ columnDefinition [, columnDefinition ]* columnDefinition [, columnDefinition ]* [ computedColumnDefinition [, computedColumnDefinition ]* ] [ tableConstraint [, tableConstraint ]* ] [ tableIndex [, tableIndex ]* ] [ WATERMARK watermarkName FOR rowtimeField AS withOffset ‘(’ rowtimeField, offset ‘)’ ] ‘)’ [ WITH ‘(’ tableOption [, tableOption ]* ‘)’ ]

columnDefinition: columnName dataType [ NOT NULL ]

dataType: VARCHAR | BOOLEAN | TINYINT | SMALLINT | INT | BIGINT | FLOAT | DECIMAL [ ‘(’ precision, scale ‘)’ ] | DOUBLE | DATE | TIME | TIMESTAMP | VARBINARY

computedColumnDefinition: columnName AS computedColumnExpression

tableConstraint: { PRIMARY KEY | UNIQUE } ‘(’ columnName [, columnName ]* ‘)’

tableIndex: [ UNIQUE ] INDEX indexName ‘(’ columnName [, columnName ]* ‘)’

rowtimeField: columnName

tableOption: property=value

offset: positiveInteger

{% endhighlight %}

DECIMAL type has a default max precision and scale (38, 18) if not declared.

VARBINARY represents an ARRAY type of byte, and other types of ARRAY are not supported yet. Also, the MULTISET type is not supported.

Computed column is a virtual column that defines a calculation (not persist in the table). The column value is calculated at runtime. A computed column can reference columns for calculation.

Table constraint includes PRIMARY KEY and UNIQUE constraints, CHECK constraint is not supported yet.

Table index supports declaring UNIQUE or NON-UNIQUE index column(s).

The unit of the watermark offset is millisecond (or ms).

CREATE VIEW

{% highlight sql %}

createView: CREATE VIEW viewName [ ‘(’ columnName [, columnName ]* ‘)’ ] AS queryStatement

{% endhighlight %}

View is not materialized, it is an alias of a query statement.

CREATE FUNCTION

{% highlight sql %}

createFunction: CREATE FUNCTION functionName AS className

className: fullyQualifiedName

{% endhighlight %}

Specifying a Query

The following example shows how to specify SQL queries on registered and inlined tables.

// ingest a DataStream from an external source DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);

// SQL query with an inlined (unregistered) table Table table = tableEnv.toTable(ds, “user, product, amount”); Table result = tableEnv.sqlQuery( “SELECT SUM(amount) FROM " + table + " WHERE product LIKE ‘%Rubber%’”);

// SQL query with a registered table // register the DataStream as table “Orders” tableEnv.registerDataStream(“Orders”, ds, “user, product, amount”); // run a SQL query on the Table and retrieve the result as a new Table Table result2 = tableEnv.sqlQuery( “SELECT product, amount FROM Orders WHERE product LIKE ‘%Rubber%’”);

// SQL update with a registered table // create and register a TableSink TableSink csvSink = new CsvTableSink(“/path/to/file”, ...); String[] fieldNames = {“product”, “amount”}; DataType[] fieldTypes = {DataTypes.STRING, DataTypes.INT}; tableEnv.registerTableSink(“RubberOrders”, fieldNames, fieldTypes, csvSink); // run a SQL update query on the Table and emit the result to the TableSink tableEnv.sqlUpdate( “INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE ‘%Rubber%’”); {% endhighlight %}

// read a DataStream from an external source val ds: DataStream[(Long, String, Integer)] = env.addSource(...)

// SQL query with an inlined (unregistered) table val table = ds.toTable(tableEnv, 'user, 'product, 'amount) val result = tableEnv.sqlQuery( s“SELECT SUM(amount) FROM $table WHERE product LIKE ‘%Rubber%’”)

// SQL query with a registered table // register the DataStream under the name “Orders” tableEnv.registerDataStream(“Orders”, ds, 'user, 'product, 'amount) // run a SQL query on the Table and retrieve the result as a new Table val result2 = tableEnv.sqlQuery( “SELECT product, amount FROM Orders WHERE product LIKE ‘%Rubber%’”)

// SQL update with a registered table // create and register a TableSink TableSink csvSink = new CsvTableSink(“/path/to/file”, ...) val fieldNames: Array[String] = Array(“product”, “amount”) val fieldTypes: Array[DataType[_]] = Array(DataTypes.STRING, DataTypes.INT) tableEnv.registerTableSink(“RubberOrders”, fieldNames, fieldTypes, csvSink) // run a SQL update query on the Table and emit the result to the TableSink tableEnv.sqlUpdate( “INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE ‘%Rubber%’”) {% endhighlight %}

{% top %}

Supported Query Syntax

Flink parses SQL using Apache Calcite, which supports standard ANSI SQL.

The following BNF-grammar describes the superset of supported SQL features in batch and streaming queries. The Operations section shows examples for the supported features and indicates which features are only supported for batch or streaming queries.

Queries

{% highlight sql %}

query: values | { select | selectWithoutFrom | query UNION [ ALL ] query | query EXCEPT query | query INTERSECT query } [ ORDER BY orderItem [, orderItem ]* ] [ LIMIT { count | ALL } ] [ OFFSET start { ROW | ROWS } ] [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY ]

orderItem: expression [ ASC | DESC ]

select: SELECT [ ALL | DISTINCT ] { ‘*’ | projectItem [, projectItem ]* } FROM { tableExpression | values } [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ] [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]

selectWithoutFrom: SELECT [ ALL | DISTINCT ] { ‘*’ | projectItem [, projectItem ]* }

projectItem: expression [ [ AS ] columnAlias ] | tableAlias . ‘*’

tableExpression: tableReference [, tableReference ]* | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]

joinCondition: ON booleanExpression | USING ‘(’ column [, column ]* ‘)’

tableReference: tablePrimary [ matchRecognize ] [ [ AS ] alias [ ‘(’ columnAlias [, columnAlias ]* ‘)’ ] ]

tablePrimary: [ TABLE ] [ [ catalogName . ] schemaName . ] tableName [ WITH ( tableOption [, tableOption]* ) ] | LATERAL TABLE ‘(’ functionName ‘(’ expression [, expression ]* ‘)’ ‘)’ | UNNEST ‘(’ expression ‘)’

values: VALUES expression [, expression ]*

groupItem: expression | ‘(’ ‘)’ | ‘(’ expression [, expression ]* ‘)’ | CUBE ‘(’ expression [, expression ]* ‘)’ | ROLLUP ‘(’ expression [, expression ]* ‘)’ | GROUPING SETS ‘(’ groupItem [, groupItem ]* ‘)’

windowRef: windowName | windowSpec

windowSpec: [ windowName ] ‘(’ [ ORDER BY orderItem [, orderItem ]* ] [ PARTITION BY expression [, expression ]* ] [ RANGE numericOrIntervalExpression [ PRECEDING ] | ROWS numericExpression [ PRECEDING ] ] ‘)’

matchRecognize: MATCH_RECOGNIZE ‘(’ [ PARTITION BY expression [, expression ]* ] [ ORDER BY orderItem [, orderItem ]* ] [ MEASURES measureColumn [, measureColumn ]* ] [ ONE ROW PER MATCH ] [ AFTER MATCH { SKIP TO NEXT ROW | SKIP PAST LAST ROW | SKIP TO FIRST variable | SKIP TO LAST variable | SKIP TO variable } ] PATTERN ‘(’ pattern ‘)’ [ WITHIN intervalLiteral ] DEFINE variable AS condition [, variable AS condition ]* ‘)’

measureColumn: expression AS alias

pattern: patternTerm [ ‘|’ patternTerm ]*

patternTerm: patternFactor [ patternFactor ]*

patternFactor: variable [ patternQuantifier ]

patternQuantifier: ‘*’ | ‘*?’ | ‘+’ | ‘+?’ | ‘?’ | ‘??’ | ‘{’ { [ minRepeat ], [ maxRepeat ] } ‘}’ [ ‘?’ ] | ‘{’ repeat ‘}’

{% endhighlight %}

Table options can not only be specified in DDL but also be specified in query statements.

Supported DML Syntax

DML (Insert Only)

The following BNF-grammar describes the superset of supported SQL features in batch and streaming queries. Currently, only INSERT operation is supported.

{% highlight sql %}

insert: INSERT INTO tableReference query [ EMIT strategy [, strategy ]* ]

strategy: { WITH DELAY timeInterval | WITHOUT DELAY } [ BEFORE WATERMARK | AFTER WATERMARK ]

timeInterval: ‘string’ timeUnit

{% endhighlight %}

Note: The EMIT clause is only valid for window queries currently. See more about EMIT Strategy in window queries.

Flink SQL uses a lexical policy for identifier (table, attribute, function names) similar to Java:

  • The case of identifiers is preserved whether or not they are quoted.
  • After which, identifiers are matched case-sensitively.
  • Unlike Java, back-ticks allow identifiers to contain non-alphanumeric characters (e.g. "SELECT a AS my field FROM t").

String literals must be enclosed in single quotes (e.g., SELECT 'Hello World'). Duplicate a single quote for escaping (e.g., SELECT 'It''s me.'). Unicode characters are supported in string literals. If explicit unicode code points are required, use the following syntax:

  • Use the backslash (\) as escaping character (default): SELECT U&'\263A'
  • Use a custom escaping character: SELECT U&'#263A' UESCAPE '#'

{% top %}

Operations

Show and Use

Scan, Projection and Filter

SELECT a, c AS d FROM Orders {% endhighlight %} Where / Filter
Batch Streaming {% highlight sql %} SELECT * FROM Orders WHERE b = ‘red’

SELECT * FROM Orders WHERE a % 2 = 0 {% endhighlight %} User-defined Scalar Functions (Scalar UDF)
Batch Streaming UDFs must be registered in the TableEnvironment. See the UDF documentation for details on how to specify and register scalar UDFs. {% highlight sql %} SELECT PRETTY_PRINT(user) FROM Orders {% endhighlight %}

{% top %}

Aggregations

SELECT COUNT(amount) OVER w, SUM(amount) OVER w FROM Orders WINDOW w AS ( PARTITION BY user ORDER BY proctime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) {% endhighlight %} Distinct
Batch Streaming
Result Updating {% highlight sql %} SELECT DISTINCT users FROM Orders {% endhighlight %} Note: For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See Streaming Concepts for details. Grouping Sets, Rollup, Cube
Batch {% highlight sql %} SELECT SUM(amount) FROM Orders GROUP BY GROUPING SETS ((user), (product)) {% endhighlight %} Having
Batch Streaming {% highlight sql %} SELECT SUM(amount) FROM Orders GROUP BY users HAVING SUM(amount) > 50 {% endhighlight %} User-defined Aggregate Functions (UDAGG)
Batch Streaming UDAGGs must be registered in the TableEnvironment. See the UDF documentation for details on how to specify and register UDAGGs. {% highlight sql %} SELECT MyAggregate(amount) FROM Orders GROUP BY users {% endhighlight %}

{% top %}

Joins

SELECT * FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id

SELECT * FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id {% endhighlight %} Time-windowed Join
Batch Streaming Note: Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.

    <p>A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (<code>&lt;, &lt;=, &gt;=, &gt;</code>), a <code>BETWEEN</code> predicate, or a single equality predicate that compares <a href="streaming.html#time-attributes">time attributes</a> of the same type (i.e., processing time or event time) of both input tables.</p>
    <p>For example, the following predicates are valid window join conditions:</p>

    <ul>
      <li><code>ltime = rtime</code></li>
      <li><code>ltime &gt;= rtime AND ltime &lt; rtime + INTERVAL '10' MINUTE</code></li>
      <li><code>ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND</code></li>
    </ul>

{% highlight sql %} SELECT * FROM Orders o, Shipments s WHERE o.id = s.orderId AND o.ordertime BETWEEN s.shiptime - INTERVAL ‘4’ HOUR AND s.shiptime {% endhighlight %}

The example above will join all orders with their corresponding shipments if the order was shipped four hours after the order was received. Expanding arrays into a relation
Batch Streaming Unnesting WITH ORDINALITY is not supported yet. {% highlight sql %} SELECT users, tag FROM Orders CROSS JOIN UNNEST(tags) AS t (tag) {% endhighlight %} Join with User Defined Table Functions (UDTF)
Batch Streaming UDTFs must be registered in the TableEnvironment. See the UDF documentation for details on how to specify and register UDTFs. Inner Join {% highlight sql %} SELECT users, tag FROM Orders, LATERAL TABLE(unnest_udtf(tags)) t AS tag {% endhighlight %} Left Outer Join {% highlight sql %} SELECT users, tag FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) t AS tag ON TRUE {% endhighlight %}

    <p>Currently only inner joins with temporal tables are supported.</p>
    <p>The following example assumes that <strong>Rates</strong> is a <a href="streaming/temporal_tables.html#temporal-table-functions">Temporal Table Function</a>.</p>

{% highlight sql %} SELECT o_amount, r_rate FROM Orders, LATERAL TABLE (Rates(o_proctime)) WHERE r_currency = o_currency {% endhighlight %} For more information please check the more detailed Temporal Tables concept description. Join with Temporal Table
Batch Streaming Temporal Tables are tables that track changes over time. A Temporal Table provides access to the versions of a temporal table at a specific point in time.

    <p>Only inner and left joins with processing-time temporal tables are supported.</p>
    <p>The following example assumes that <strong>LatestRates</strong> is a <a href="streaming/temporal_tables.html#temporal-table">Temporal Table</a> which is usually an external database table (dimension table).</p>

{% highlight sql %} SELECT o.amout, o.currency, r.rate, o.amount * r.rate FROM Orders AS o JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r ON r.currency = o.currency {% endhighlight %} For more information please check the more detailed Temporal Tables concept description. Left Semi-Join
Batch Streaming The left semi-join is a joining similar to the natural join, and only returns the rows of the left table where it can find a match in the right table. Sub-queries using IN and EXISTS will be converted to left semi-joins. Note: IN and EXISTS in conjunctive condition is supported. {% highlight sql %} SELECT * FROM Orders WHERE Orders.productId IN (SELECT Product.id FROM Product) AND Orders.ordertime IS NOT NULL

SELECT * FROM Orders WHERE EXISTS (SELECT * FROM Product WHERE Orders.productId = Product.id) AND Orders.ordertime IS NOT NULL {% endhighlight %} Left Anti-Join
Batch Streaming The left anti-join is a joining similar to the left semi-join, and only returns the rows of the left table where it can not find a match in the right table. Sub-queries using NOT IN and NOT EXISTS will be converted to left anti-joins. Note: NOT IN and NOT EXISTS in conjunctive condition are supported. {% highlight sql %} SELECT * FROM Orders WHERE Orders.productId NOT IN (SELECT Product.id FROM Product) AND Orders.ordertime IS NOT NULL

SELECT * FROM Orders WHERE NOT EXISTS (SELECT * FROM Product WHERE Orders.productId = Product.id) AND Orders.ordertime IS NOT NULL {% endhighlight %}

{% top %}

TOPN

TOPN is used to calculate the maximum/minimum N records in a stream. It can be flexibly completed based on OVER window aggregation. The grammar is shown as below: {% highlight sql %} SELECT * FROM ( SELECT * , ROW_NUMBER() OVER ([PARTITION BY col1[, col2..] ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum FROM table_name) WHERE rownum <= N [AND conditions]

{% endhighlight %}

Parameter Specification

  • ROW_NUMBER(): An over window function to calculate the row number, starting from 1.
  • PARTITION BY col1[, col2..]: Specifying the columns by which the records are partitioned.
  • ORDER BY col1[,asc|desc] [, col2 [asc|desc]...]: Specifying the columns by which the records are ordered. The ordering directions can be different on different columns.

Flink SQL will sort the input data stream according to the order key, so if the top N records have been changed, the changed ones will be sent as retract records to downstream. In addition, if the top N records need to be stored in external storage, the result table must be defined with a primary key.

NOTE: The usage of TOPN has some constraints. To enable Flink SQL to recognize that this query is a TOPN query, the where rownum <= N clause is necessary for the outer sql query and cannot be substituted by expressions containing rownum (for example, where rownum - 5 <= N). Besides, it is free to add other conditions in the where clause, but they can only be joined using and.

No Ranking Number Optimization

As stated above, the rownum field will be written into the result table as one field of the primary key, which leads to many duplicate records being written to the result table. For example, when the record of rank 9 is updated and its rank is changed to 1, all the first 9 records will be rewritten to the result table as updating result. If the result table receives too many data, it will become the bottleneck of the SQL job.

The optimization method is discarding rownum field when writing records to the result table. This is reasonable because the number of the top N records is usually not large, thus the consumers can sort the records themselves quickly. Without rownum field, in the example above, only the changed record needs to be sent to downstream, which can reduce much burden of the result table.

Grammar {% highlight sql %} SELECT col1, col2, col3 FROM ( SELECT col1, col2, col3 ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]] ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum FROM table_name) WHERE rownum <= N [AND conditions] {% endhighlight %}

Note: When this optimization is enabled, the primary keys of the result table should be consistent with keys of group by aggregation upstream before TOPN operator. Otherwise, the results may be incorrect.

The following examples show how to specify SQL queries with TOPN on streaming tables.

// ingest a DataStream from an external source DataStream<Tuple3<String, Long, Integer>> ds = env.addSource(...); // register the DataStream as table “Orders” tableEnv.registerDataStream(“Orders”, ds, “category, shopId, num”);

// select top-2 goods of different categories which are sold well Table result1 = tableEnv.sqlQuery( “SELECT * " + “FROM (” + " SELECT category, shopId, num,” + " ROW_NUMBER() OVER (PARTITION BY category ORDER BY num DESC) as rank_num" + " FROM T)" + “WHERE rank_num <= 2”);

{% endhighlight %}

// read a DataStream from an external source val ds: DataStream[(String, Long, Int)] = env.addSource(...) // register the DataStream under the name “Orders” tableEnv.registerDataStream(“Orders”, ds, 'category, 'shopId, 'num)

// select top-2 goods of different categories which are sold well val result1 = tableEnv.sqlQuery( """ |SELECT * |FROM ( | SELECT category, shopId, num, | ROW_NUMBER() OVER (PARTITION BY category ORDER BY num DESC) as rank_num | FROM T) |WHERE rank_num <= 2 """.stripMargin)

{% endhighlight %}

{% top %}

Set Operations

<tr>
  <td>
    <strong>Intersect / Except</strong><br>
    <span class="label label-primary">Batch</span>
  </td>
  <td>

{% highlight sql %} SELECT * FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) INTERSECT (SELECT user FROM Orders WHERE b = 0) ) {% endhighlight %} {% highlight sql %} SELECT * FROM ( (SELECT user FROM Orders WHERE a % 2 = 0) EXCEPT (SELECT user FROM Orders WHERE b = 0) ) {% endhighlight %}

<tr>
  <td>
    <strong>In</strong><br>
    <span class="label label-primary">Batch</span>
  </td>
  <td>
  Returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.

{% highlight sql %} SELECT user, amount FROM Orders WHERE product IN ( SELECT product FROM NewProducts ) {% endhighlight %}

{% top %}

OrderBy & Limit

{% highlight sql %} --- sort by time attribute --- SELECT * FROM Orders ORDER BY orderTime

--- universal sort --- SELECT * FROM Orders ORDER BY orderId DESC {% endhighlight %}

<tr>
  <td><strong>Limit</strong><br>
    <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
  </td>
  <td>

{% highlight sql %} SELECT * FROM Orders LIMIT 3 {% endhighlight %}

{% top %}

Insert

{% highlight sql %} INSERT INTO OutputTable SELECT users, tag FROM Orders {% endhighlight %}

{% top %}

Group Windows

Group windows are defined in the GROUP BY clause of a SQL query. Just like queries with regular GROUP BY clauses, queries with a GROUP BY clause that includes a group window function compute a single result row per group. The following group windows functions are supported for SQL on batch and streaming tables.

Time Attributes

For SQL queries on streaming tables, the time_attr argument of the group window function must refer to a valid time attribute that specifies the processing time or event time of rows. See the documentation of time attributes to learn how to define time attributes.

For SQL on batch tables, the time_attr argument of the group window function must be an attribute of type TIMESTAMP.

Selecting Group Window Start and End Timestamps

The start and end timestamps of group windows as well as time attributes can be selected with the following auxiliary functions:

Note: Auxiliary functions must be called with exactly the same arguments as the group window function in the GROUP BY clause.

EMIT Strategy

The EMIT strategy (such as the allowed latency) of the aggregation result varies in different streaming SQL scenarios. For example, users may desire the functionality that they can get the newest result every minute before the end of an 1 hour tumble window and wait for late data for 1 day after the end of the window. This type of demands are not support in conventional ANSI SQL, thus the Flink SQL grammar is extended to include the EMIT strategy.

The purpose of EMIT strategy is concluded as two aspects:

  1. Control the latency: setting the firing frequency before the end of “big” windows to enable users get newest result in time.
  2. Data accuracy: waiting for late data in a specified time, and updating window results on arrival of late data.

The Maxmium Allowed Lateness

It is necessary to specify how long the late data can be waited for by user configurations when AFTER strategy is used. Flink SQL provides a parameter sql.exec.state.ttl.ms, to indicate the maximum allowed lateness. For example, blink.state.ttl.ms=3600000 means only data that arrives 1 hour late after the end of window will be discarded.

The following examples show how to specify SQL queries with group windows and the EMIT strategy on streaming tables.

// ingest a DataStream from an external source DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...); // register the DataStream as table “Orders” tableEnv.registerDataStream(“Orders”, ds, “user, product, amount, proctime.proctime, rowtime.rowtime”);

// compute SUM(amount) per day (in event-time) Table result1 = tableEnv.sqlQuery( "SELECT user, " + " TUMBLE_START(rowtime, INTERVAL ‘1’ DAY) as wStart, " + " SUM(amount) FROM Orders " + “GROUP BY TUMBLE(rowtime, INTERVAL ‘1’ DAY), user”);

// compute SUM(amount) per day (in processing-time) Table result2 = tableEnv.sqlQuery( “SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL ‘1’ DAY), user”);

// compute every hour the SUM(amount) of the last 24 hours in event-time Table result3 = tableEnv.sqlQuery( “SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL ‘1’ HOUR, INTERVAL ‘1’ DAY), product”);

// compute SUM(amount) per session with 12 hour inactivity gap (in event-time) Table result4 = tableEnv.sqlQuery( "SELECT user, " + " SESSION_START(rowtime, INTERVAL ‘12’ HOUR) AS sStart, " + " SESSION_ROWTIME(rowtime, INTERVAL ‘12’ HOUR) AS snd, " + " SUM(amount) " + "FROM Orders " + “GROUP BY SESSION(rowtime, INTERVAL ‘12’ HOUR), user”);

// ==== the following examples show the usage of emit grammar to enable early-fire and late-fire of window. ====

// emit window result every 1 minute before end of window. Table result5 = tableEnv.sqlQuery( "SELECT user, " + " TUMBLE_START(rowtime, INTERVAL ‘1’ DAY) as wStart, " + " SUM(amount) FROM Orders " + “GROUP BY TUMBLE(rowtime, INTERVAL ‘1’ DAY), user” + “WITH DELAY ‘1’ MINUTE BEFORE WATERMARK”);

// emit window result without latency when elements arrive after end of window. Table result6 = tableEnv.sqlQuery( "SELECT user, " + " TUMBLE_START(rowtime, INTERVAL ‘1’ DAY) as wStart, " + " SUM(amount) FROM Orders " + “GROUP BY TUMBLE(rowtime, INTERVAL ‘1’ DAY), user” + “EMIT WITHOUT DELAY AFTER WATERMARK”);

// emit window result every 1 minute globally. Table result7 = tableEnv.sqlQuery( "SELECT user, " + " TUMBLE_START(rowtime, INTERVAL ‘1’ DAY) as wStart, " + " SUM(amount) FROM Orders " + “GROUP BY TUMBLE(rowtime, INTERVAL ‘1’ DAY), user” + “EMIT WITH DELAY ‘1’ MINUTE”); {% endhighlight %}

// read a DataStream from an external source val ds: DataStream[(Long, String, Int)] = env.addSource(...) // register the DataStream under the name “Orders” tableEnv.registerDataStream(“Orders”, ds, 'user, 'product, 'amount, 'proctime.proctime, 'rowtime.rowtime)

// compute SUM(amount) per day (in event-time) val result1 = tableEnv.sqlQuery( """ |SELECT | user, | TUMBLE_START(rowtime, INTERVAL ‘1’ DAY) as wStart, | SUM(amount) | FROM Orders | GROUP BY TUMBLE(rowtime, INTERVAL ‘1’ DAY), user """.stripMargin)

// compute SUM(amount) per day (in processing-time) val result2 = tableEnv.sqlQuery( “SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL ‘1’ DAY), user”)

// compute every hour the SUM(amount) of the last 24 hours in event-time val result3 = tableEnv.sqlQuery( “SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL ‘1’ HOUR, INTERVAL ‘1’ DAY), product”)

// compute SUM(amount) per session with 12 hour inactivity gap (in event-time) val result4 = tableEnv.sqlQuery( """ |SELECT | user, | SESSION_START(rowtime, INTERVAL ‘12’ HOUR) AS sStart, | SESSION_END(rowtime, INTERVAL ‘12’ HOUR) AS sEnd, | SUM(amount) | FROM Orders | GROUP BY SESSION(rowtime(), INTERVAL ‘12’ HOUR), user """.stripMargin)

// ==== the following examples show the usage of emit grammar to enable early-fire and late-fire of window. ====

// emit window result every 1 minute before end of window. val result5 = tableEnv.sqlQuery( """ |SELECT | user, | SESSION_START(rowtime, INTERVAL ‘12’ HOUR) AS sStart, | SESSION_END(rowtime, INTERVAL ‘12’ HOUR) AS sEnd, | SUM(amount) | FROM Orders | GROUP BY SESSION(rowtime(), INTERVAL ‘12’ HOUR), user | WITH DELAY ‘1’ MINUTE BEFORE WATERMARK """.stripMargin)

// emit window result without latency when elements arrive after end of window. val result6 = tableEnv.sqlQuery( """ |SELECT | user, | SESSION_START(rowtime, INTERVAL ‘12’ HOUR) AS sStart, | SESSION_END(rowtime, INTERVAL ‘12’ HOUR) AS sEnd, | SUM(amount) | FROM Orders | GROUP BY SESSION(rowtime(), INTERVAL ‘12’ HOUR), user | EMIT WITHOUT DELAY AFTER WATERMARK """.stripMargin)

// emit window result every 1 minute globally. val result7 = tableEnv.sqlQuery( """ |SELECT | user, | SESSION_START(rowtime, INTERVAL ‘12’ HOUR) AS sStart, | SESSION_END(rowtime, INTERVAL ‘12’ HOUR) AS sEnd, | SUM(amount) | FROM Orders | GROUP BY SESSION(rowtime(), INTERVAL ‘12’ HOUR), user | EMIT WITH DELAY ‘1’ MINUTE """.stripMargin)

{% endhighlight %}

{% top %}

Pattern Recognition

{% highlight sql %} SELECT T.aid, T.bid, T.cid FROM MyTable MATCH_RECOGNIZE ( PARTITION BY userid ORDER BY proctime MEASURES A.id AS aid, B.id AS bid, C.id AS cid PATTERN (A B C) DEFINE A AS name = ‘a’, B AS name = ‘b’, C AS name = ‘c’ ) AS T {% endhighlight %}

{% top %}

Data Types

The SQL runtime is built on top of Flink‘s DataStream APIs. Internally, it uses table’s InternalType to define data types. Fully supported types are listed in org.apache.flink.table.api.Types. The following table summarizes the relation between SQL Types, Table API types, and the resulting Java class.

Table APISQLJava type
DataTypes.STRINGVARCHARjava.lang.String
DataTypes.BOOLEANBOOLEANjava.lang.Boolean
DataTypes.BYTETINYINTjava.lang.Byte
DataTypes.SHORTSMALLINTjava.lang.Short
DataTypes.INTINTEGER, INTjava.lang.Integer
DataTypes.LONGBIGINTjava.lang.Long
DataTypes.FLOATREAL, FLOATjava.lang.Float
DataTypes.DOUBLEDOUBLEjava.lang.Double
DataTypes.DECIMALDECIMALjava.math.BigDecimal
DataTypes.SQL_DATEDATEjava.sql.Date
DataTypes.SQL_TIMETIMEjava.sql.Time
DataTypes.SQL_TIMESTAMPTIMESTAMP(3)java.sql.Timestamp
DataTypes.INTERVAL_MONTHSINTERVAL YEAR TO MONTHjava.lang.Integer
DataTypes.INTERVAL_MILLISINTERVAL DAY TO SECOND(3)java.lang.Long
DataTypes.PRIMITIVE_ARRAYARRAYe.g. int[]
DataTypes.OBJECT_ARRAYARRAYe.g. java.lang.Byte[]
DataTypes.MAPMAPjava.util.HashMap
DataTypes.MULTISETMULTISETe.g. java.util.HashMap<String, Integer> for a multiset of String

Generic types and composite types (e.g., POJOs or Tuples) can be fields of a row as well. Generic types are treated as a black box and can be passed on or processed by user-defined functions. Composite types can be accessed with built-in functions (see Value access functions section).

{% top %}

Built-In Functions

Flink's SQL support comes with a set of built-in functions for data transformations. This section gives a brief overview of the available functions.

The Flink SQL functions (including their syntax) are a subset of Apache Calcite's built-in functions. Most of the documentation has been adopted from the Calcite SQL reference.

<tr>
  <td>
    {% highlight text %}

value1 <> value2 {% endhighlight %} Not equal.

<tr>
  <td>
    {% highlight text %}

value1 > value2 {% endhighlight %} Greater than.

<tr>
  <td>
    {% highlight text %}

value1 >= value2 {% endhighlight %} Greater than or equal.

<tr>
  <td>
    {% highlight text %}

value1 < value2 {% endhighlight %} Less than.

<tr>
  <td>
    {% highlight text %}

value1 <= value2 {% endhighlight %} Less than or equal.

<tr>
  <td>
    {% highlight text %}

value IS NULL {% endhighlight %} Returns TRUE if value is null.

<tr>
  <td>
    {% highlight text %}

value IS NOT NULL {% endhighlight %} Returns TRUE if value is not null.

<tr>
  <td>
    {% highlight text %}

value1 IS DISTINCT FROM value2 {% endhighlight %} Returns TRUE if two values are not equal, treating null values as the same.

<tr>
  <td>
    {% highlight text %}

value1 IS NOT DISTINCT FROM value2 {% endhighlight %} Returns TRUE if two values are equal, treating null values as the same.

<tr>
  <td>
    {% highlight text %}

value1 BETWEEN [ASYMMETRIC | SYMMETRIC] value2 AND value3 {% endhighlight %} Returns TRUE if value1 is greater than or equal to value2 and less than or equal to value3.

<tr>
  <td>
    {% highlight text %}

value1 NOT BETWEEN value2 AND value3 {% endhighlight %} Returns TRUE if value1 is less than value2 or greater than value3.

<tr>
  <td>
    {% highlight text %}

string1 LIKE string2 [ ESCAPE string3 ] {% endhighlight %} Returns TRUE if string1 matches pattern string2. An escape character can be defined if necessary.

<tr>
  <td>
    {% highlight text %}

string1 NOT LIKE string2 [ ESCAPE string3 ] {% endhighlight %} Returns TRUE if string1 does not match pattern string2. An escape character can be defined if necessary.

<tr>
  <td>
    {% highlight text %}

string1 SIMILAR TO string2 [ ESCAPE string3 ] {% endhighlight %} Returns TRUE if string1 matches regular expression string2. An escape character can be defined if necessary.

<tr>
  <td>
    {% highlight text %}

string1 NOT SIMILAR TO string2 [ ESCAPE string3 ] {% endhighlight %} Returns TRUE if string1 does not match regular expression string2. An escape character can be defined if necessary.

<tr>
  <td>
    {% highlight text %}

value IN (value [, value]* ) {% endhighlight %} Returns TRUE if an expression exists in a given list of expressions. This is a shorthand for multiple OR conditions. If the testing set contains NULL, the result will be NULL if the element can not be found and TRUE if it can be found. If the element is NULL, the result is always NULL. E.g. “42 IN (1, 2, 3)” leads to FALSE.

<tr>
  <td>
    {% highlight text %}

value NOT IN (value [, value]* ) {% endhighlight %} Returns TRUE if value is not equal to every value in a list.

<tr>
  <td>
    {% highlight text %}

EXISTS (sub-query) {% endhighlight %} Returns TRUE if sub-query returns at least one row. Only supported if the operation can be rewritten in a join and group operation.

<tr>
  <td>
    {% highlight text %}

value IN (sub-query) {% endhighlight %} Returns TRUE if value is equal to a row returned by sub-query. This operation is not supported in a streaming environment yet.

<tr>
  <td>
    {% highlight text %}

value NOT IN (sub-query) {% endhighlight %} Returns TRUE if value is not equal to every row returned by sub-query. This operation is not supported in a streaming environment yet.

<tr>
  <td>
    {% highlight text %}

boolean1 AND boolean2 {% endhighlight %} Returns TRUE if boolean1 and boolean2 are both TRUE. Supports three-valued logic.

<tr>
  <td>
    {% highlight text %}

NOT boolean {% endhighlight %} Returns TRUE if boolean is not TRUE; returns UNKNOWN if boolean is UNKNOWN.

<tr>
  <td>
    {% highlight text %}

boolean IS FALSE {% endhighlight %} Returns TRUE if boolean is FALSE; returns FALSE if boolean is UNKNOWN.

<tr>
  <td>
    {% highlight text %}

boolean IS NOT FALSE {% endhighlight %} Returns TRUE if boolean is not FALSE; returns TRUE if boolean is UNKNOWN.

<tr>
  <td>
    {% highlight text %}

boolean IS TRUE {% endhighlight %} Returns TRUE if boolean is TRUE; returns FALSE if boolean is UNKNOWN.

<tr>
  <td>
    {% highlight text %}

boolean IS NOT TRUE {% endhighlight %} Returns TRUE if boolean is not TRUE; returns TRUE if boolean is UNKNOWN.

<tr>
  <td>
    {% highlight text %}

boolean IS UNKNOWN {% endhighlight %} Returns TRUE if boolean is UNKNOWN.

<tr>
  <td>
    {% highlight text %}

boolean IS NOT UNKNOWN {% endhighlight %} Returns TRUE if boolean is not UNKNOWN.

<tr>
  <td>
    {% highlight text %}
  • numeric {% endhighlight %} Returns negative numeric.

numeric1 + numeric2 {% endhighlight %} Returns numeric1 plus numeric2.

<tr>
  <td>
    {% highlight text %}

numeric1 - numeric2 {% endhighlight %} Returns numeric1 minus numeric2.

<tr>
  <td>
    {% highlight text %}

numeric1 * numeric2 {% endhighlight %} Returns numeric1 multiplied by numeric2.

<tr>
  <td>
    {% highlight text %}

numeric1 / numeric2 {% endhighlight %} Returns numeric1 divided by numeric2.

<tr>
  <td>
    {% highlight text %}

POWER(numeric1, numeric2) {% endhighlight %} Returns numeric1 raised to the power of numeric2.

<tr>
  <td>
    {% highlight text %}

ABS(numeric) {% endhighlight %} Returns the absolute value of numeric.

<tr>
  <td>
    {% highlight text %}

MOD(numeric1, numeric2) {% endhighlight %} Returns the remainder (modulus) of numeric1 divided by numeric2. The result is negative only if numeric1 is negative.

<tr>
  <td>
    {% highlight text %}

SQRT(numeric) {% endhighlight %} Returns the square root of numeric.

<tr>
  <td>
    {% highlight text %}

LN(numeric) {% endhighlight %} Returns the natural logarithm (base e) of numeric.

<tr>
  <td>
    {% highlight text %}

LOG10(numeric) {% endhighlight %} Returns the base 10 logarithm of numeric.

<tr>
  <td>
    {% highlight text %}

LOG2(numeric) {% endhighlight %} Returns the base 2 logarithm of numeric.

<tr>
  <td>
    {% highlight text %}

EXP(numeric) {% endhighlight %} Returns e raised to the power of numeric.

<tr>
  <td>
    {% highlight text %}

CEIL(numeric) CEILING(numeric) {% endhighlight %} Rounds numeric up, and returns the smallest number that is greater than or equal to numeric.

<tr>
  <td>
    {% highlight text %}

FLOOR(numeric) {% endhighlight %} Rounds numeric down, and returns the largest number that is less than or equal to numeric.

<tr>
  <td>
    {% highlight text %}

SIN(numeric) {% endhighlight %} Calculates the sine of a given number.

<tr>
  <td>
    {% highlight text %}

COS(numeric) {% endhighlight %} Calculates the cosine of a given number.

<tr>
  <td>
    {% highlight text %}

TAN(numeric) {% endhighlight %} Calculates the tangent of a given number.

<tr>
  <td>
    {% highlight text %}

COT(numeric) {% endhighlight %} Calculates the cotangent of a given number.

<tr>
  <td>
    {% highlight text %}

ASIN(numeric) {% endhighlight %} Calculates the arc sine of a given number.

<tr>
  <td>
    {% highlight text %}

ACOS(numeric) {% endhighlight %} Calculates the arc cosine of a given number.

<tr>
  <td>
    {% highlight text %}

SINH(numeric) {% endhighlight %} Calculates the hyperbolic sine of a given number.

<tr>
  <td>
    {% highlight text %}

ATAN(numeric) {% endhighlight %} Calculates the arc tangent of a given number.

<tr>
  <td>
    {% highlight text %}

TANH(numeric) {% endhighlight %} Calculates the hyperbolic tangent of a given number.

<tr>
  <td>
    {% highlight text %}

DEGREES(numeric) {% endhighlight %} Converts numeric from radians to degrees.

<tr>
  <td>
    {% highlight text %}

RADIANS(numeric) {% endhighlight %} Converts numeric from degrees to radians.

<tr>
  <td>
    {% highlight text %}

SIGN(numeric) {% endhighlight %} Calculates the signum of a given number.

<tr>
  <td>
    {% highlight text %}

ROUND(numeric, int) {% endhighlight %} Rounds the given number to integer places right to the decimal point.

<tr>
  <td>
    {% highlight text %}

PI() {% endhighlight %} Returns a value that is closer than any other value to pi. {% highlight text %} E() {% endhighlight %} Returns a value that is closer than any other value to e.

<tr>
  <td>
    {% highlight text %}

RAND() {% endhighlight %} Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive).

<tr>
  <td>
    {% highlight text %}

RAND(seed integer) {% endhighlight %} Returns a pseudorandom double value between 0.0 (inclusive) and 1.0 (exclusive) with a initial seed. Two RAND functions will return identical sequences of numbers if they have same initial seed.

<tr>
 <td>
   {% highlight text %}

RAND_INTEGER(bound integer) {% endhighlight %} Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value (exclusive).

<tr>
 <td>
   {% highlight text %}

RAND_INTEGER(seed integer, bound integer) {% endhighlight %} Returns a pseudorandom integer value between 0.0 (inclusive) and the specified value (exclusive) with a initial seed. Two RAND_INTEGER functions will return identical sequences of numbers if they have same initial seed and same bound.

<tr>
 <td>
   {% highlight text %}

LOG(x numeric), LOG(base numeric, x numeric) {% endhighlight %} Returns the natural logarithm of a specified number of a specified base. If called with one parameter, this function returns the natural logarithm of x. If called with two parameters, this function returns the logarithm of x to the base b. x must be greater than 0. b must be greater than 1.

<tr>
  <td>

{% highlight text %} BIN(numeric) {% endhighlight %} Returns a string representation of an integer numeric value in binary format. Returns null if numeric is null. E.g. “4” leads to “100”, “12” leads to “1100”.

<tr>
  <td>
    {% highlight text %}

CHAR_LENGTH(string) {% endhighlight %} Returns the number of characters in a character string.

<tr>
  <td>
    {% highlight text %}

CHARACTER_LENGTH(string) {% endhighlight %} As CHAR_LENGTH(string).

<tr>
  <td>
    {% highlight text %}

UPPER(string) {% endhighlight %} Returns a character string converted to upper case.

<tr>
  <td>
    {% highlight text %}

LOWER(string) {% endhighlight %} Returns a character string converted to lower case.

<tr>
  <td>
    {% highlight text %}

POSITION(string1 IN string2) {% endhighlight %} Returns the position of the first occurrence of string1 in string2.

<tr>
  <td>
    {% highlight text %}

TRIM( { BOTH | LEADING | TRAILING } string1 FROM string2) {% endhighlight %} Removes leading and/or trailing characters from string2. By default, whitespaces at both sides are removed.

<tr>
  <td>
    {% highlight text %}

OVERLAY(string1 PLACING string2 FROM integer [ FOR integer2 ]) {% endhighlight %} Replaces a substring of string1 with string2.

<tr>
  <td>
    {% highlight text %}

SUBSTRING(string FROM integer) {% endhighlight %} Returns a substring of a character string starting at a given point.

<tr>
  <td>
    {% highlight text %}

SUBSTRING(string FROM integer FOR integer) {% endhighlight %} Returns a substring of a character string starting at a given point with a given length.

<tr>
  <td>
    {% highlight text %}

INITCAP(string) {% endhighlight %} Returns string with the first letter of each word converter to upper case and the rest to lower case. Words are sequences of alphanumeric characters separated by non-alphanumeric characters.

<tr>
  <td>
    {% highlight text %}

CONCAT(string1, string2,...) {% endhighlight %} Returns the string that results from concatenating the arguments. Returns NULL if any argument is NULL. E.g. CONCAT(“AA”, “BB”, “CC”) returns AABBCC.

<tr>
  <td>
    {% highlight text %}

CONCAT_WS(separator, string1, string2,...) {% endhighlight %} Returns the string that results from concatenating the arguments using a separator. The separator is added between the strings to be concatenated. Returns NULL If the separator is NULL. CONCAT_WS() does not skip empty strings. However, it does skip any NULL argument. E.g. CONCAT_WS(“~”, “AA”, “BB”, "", “CC”) returns AA~BB~~CC

    <tr>
  <td>
    {% highlight text %}

LPAD(text string, len integer, pad string) {% endhighlight %} Returns the string text left-padded with the string pad to a length of len characters. If text is longer than len, the return value is shortened to len characters. E.g. LPAD(‘hi’,4,‘??’) returns ??hi, LPAD(‘hi’,1,‘??’) returns h. {% highlight text %} RPAD(text string, len integer, pad string) {% endhighlight %} Returns the string text right-padded with the string pad to a length of len characters. If text is longer than len, the return value is shortened to len characters. E.g. RPAD(‘hi’,4,‘??’) returns hi??, RPAD(‘hi’,1,‘??’) returns h.

<tr>
  <td>
    {% highlight text %}

CASE WHEN condition1 THEN result1 [ WHEN conditionN THEN resultN ]* [ ELSE resultZ ] END {% endhighlight %} Searched case.

<tr>
  <td>
    {% highlight text %}

NULLIF(value, value) {% endhighlight %} Returns NULL if the values are the same. For example, NULLIF(5, 5) returns NULL; NULLIF(5, 0) returns 5.

<tr>
  <td>
    {% highlight text %}

COALESCE(value, value [, value ]* ) {% endhighlight %} Provides a value if the first value is null. For example, COALESCE(NULL, 5) returns 5.

<tr>
  <td>
    {% highlight text %}

TIME string {% endhighlight %} Parses a time string in the form “hh:mm:ss” to a SQL time.

<tr>
  <td>
    {% highlight text %}

TIMESTAMP string {% endhighlight %} Parses a timestamp string in the form “yy-mm-dd hh:mm:ss.fff” to a SQL timestamp.

<tr>
  <td>
    {% highlight text %}

INTERVAL string range {% endhighlight %} Parses an interval string in the form “dd hh:mm:ss.fff” for SQL intervals of milliseconds or “yyyy-mm” for SQL intervals of months. An interval range might be e.g. DAY, MINUTE, DAY TO HOUR, or DAY TO SECOND for intervals of milliseconds; YEAR or YEAR TO MONTH for intervals of months. E.g. INTERVAL ‘10 00:00:00.004’ DAY TO SECOND, INTERVAL ‘10’ DAY, or INTERVAL ‘2-10’ YEAR TO MONTH return intervals.

<tr>
  <td>
    {% highlight text %}

CURRENT_DATE {% endhighlight %} Returns the current SQL date in UTC time zone.

<tr>
  <td>
    {% highlight text %}

CURRENT_TIME {% endhighlight %} Returns the current SQL time in UTC time zone.

<tr>
  <td>
    {% highlight text %}

CURRENT_TIMESTAMP {% endhighlight %} Returns the current SQL timestamp in UTC time zone.

<tr>
  <td>
    {% highlight text %}

LOCALTIME {% endhighlight %} Returns the current SQL time in local time zone.

<tr>
  <td>
    {% highlight text %}

LOCALTIMESTAMP {% endhighlight %} Returns the current SQL timestamp in local time zone.

<tr>
  <td>
    {% highlight text %}

EXTRACT(timeintervalunit FROM temporal) {% endhighlight %} Extracts parts of a time point or time interval. Returns the part as a long value. E.g. EXTRACT(DAY FROM DATE ‘2006-06-05’) leads to 5.

<tr>
  <td>
    {% highlight text %}

FLOOR(timepoint TO timeintervalunit) {% endhighlight %} Rounds a time point down to the given unit. E.g. FLOOR(TIME ‘12:44:31’ TO MINUTE) leads to 12:44:00.

<tr>
  <td>
    {% highlight text %}

CEIL(timepoint TO timeintervalunit) {% endhighlight %} Rounds a time point up to the given unit. E.g. CEIL(TIME ‘12:44:31’ TO MINUTE) leads to 12:45:00.

<tr>
  <td>
    {% highlight text %}

QUARTER(date) {% endhighlight %} Returns the quarter of a year from a SQL date. E.g. QUARTER(DATE ‘1994-09-27’) leads to 3.

<tr>
  <td>
    {% highlight text %}

(timepoint, temporal) OVERLAPS (timepoint, temporal) {% endhighlight %} Determines whether two anchored time intervals overlap. Time point and temporal are transformed into a range defined by two time points (start, end). The function evaluates leftEnd >= rightStart && rightEnd >= leftStart. E.g. (TIME ‘2:55:00’, INTERVAL ‘1’ HOUR) OVERLAPS (TIME ‘3:30:00’, INTERVAL ‘2’ HOUR) leads to true; (TIME ‘9:00:00’, TIME ‘10:00:00’) OVERLAPS (TIME ‘10:15:00’, INTERVAL ‘3’ HOUR) leads to false.

<tr>
  <td>
    {% highlight text %}

DATE_FORMAT(timestamp, format) {% endhighlight %} Formats timestamp as a string using a specified format string. The format must be compatible with MySQL's date formatting syntax as used by the date_parse function. The format specification is given in the Date Format Specifier table below. For example DATE_FORMAT(ts, ‘%Y, %d %M’) results in strings formatted as “2017, 05 May”.

<tr>
  <td>
    {% highlight text %}

TIMESTAMPADD(unit, interval, timestamp) {% endhighlight %} Adds a (signed) integer interval to a timestamp. The unit for the interval is given by the unit argument, which should be one of the following values: SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, QUARTER, or YEAR. E.g. TIMESTAMPADD(WEEK, 1, ‘2003-01-02’) leads to 2003-01-09.

<tr>
  <td>
    {% highlight text %}

COUNT(*) {% endhighlight %} Returns the number of input rows.

<tr>
  <td>
    {% highlight text %}

AVG(numeric) {% endhighlight %} Returns the average (arithmetic mean) of numeric across all input values.

<tr>
  <td>
    {% highlight text %}

SUM(numeric) {% endhighlight %} Returns the sum of numeric across all input values.

<tr>
  <td>
    {% highlight text %}

MAX(value) {% endhighlight %} Returns the maximum value of value across all input values.

<tr>
  <td>
    {% highlight text %}

MIN(value) {% endhighlight %} Returns the minimum value of value across all input values. {% highlight text %} STDDEV_POP(value) {% endhighlight %} Returns the population standard deviation of the numeric field across all input values.

<tr>
  <td>
    {% highlight text %}

VAR_POP(value) {% endhighlight %} Returns the population variance (square of the population standard deviation) of the numeric field across all input values.

<tr>
  <td>
    {% highlight text %}

VAR_SAMP(value) {% endhighlight %} Returns the sample variance (square of the sample standard deviation) of the numeric field across all input values.

<tr>
  <td>
      {% highlight text %}
      COLLECT(value)
      {% endhighlight %}
  </td>
  <td>
      <p>Returns a multiset of the <i>value</i>s. null input <i>value</i> will be ignored. Return an empty multiset if only null values are added. </p>
  </td>
</tr>

<tr>
  <td>
    {% highlight java %}

concat_agg(value, sep) {% endhighlight %} Concat all the column values to one string, will ignore null values, a separator can be specified.

<tr>
  <td>
    {% highlight java %}

first_value() {% endhighlight %} Fetch the first not null value of a group and return, if all the values are null, then return null.

<tr>
  <td>
    {% highlight java %}

last_value() {% endhighlight %} Fetch the last not null value of a group and return, if all the values are null, then return null.

<tr>
  <td>
    {% highlight text %}

GROUPING(expression) {% endhighlight %} Returns 1 if expression is rolled up in the current row’s grouping set, 0 otherwise.

<tr>
  <td>
    {% highlight text %}

GROUPING_ID(expression [, expression]* ) {% endhighlight %} Returns a bit vector of the given grouping expressions.

<tr>
  <td>
    {% highlight text %}

tableName.compositeType.* {% endhighlight %} Converts a Flink composite type (such as Tuple, POJO, etc.) and all of its direct subtypes into a flat representation where every subtype is a separate field.

<tr>
  <td>
    {% highlight text %}

(value, [, value]*) {% endhighlight %} Creates a row from a list of values.

<tr>
  <td>
    {% highlight text %}

ROW(value, [, value]*) {% endhighlight %} Creates a row from a list of values.

<tr>
  <td>
    {% highlight text %}

ARRAY ‘[’ value [, value ]* ‘]’ {% endhighlight %} Creates an array from a list of values.

<tr>
  <td>
    {% highlight text %}

MAP ‘[’ key, value [, key, value ]* ‘]’ {% endhighlight %} Creates a map from a list of key-value pairs.

<tr>
  <td>
    {% highlight text %}

CARDINALITY(ARRAY) {% endhighlight %} Returns the number of elements of an array.

<tr>
  <td>
    {% highlight text %}

array ‘[’ index ‘]’ {% endhighlight %} Returns the element at a particular position in an array. The index starts at 1.

<tr>
  <td>
    {% highlight text %}

ELEMENT(ARRAY) {% endhighlight %} Returns the sole element of an array with a single element. Returns null if the array is empty. Throws an exception if the array has more than one element.

<tr>
  <td>
    {% highlight text %}

CARDINALITY(MAP) {% endhighlight %} Returns the number of entries of a map.

<tr>
  <td>
    {% highlight text %}

map ‘[’ key ‘]’ {% endhighlight %} Returns the value specified by a particular key in a map.

<tr>
  <td>
    {% highlight text %}

SHA1(string) {% endhighlight %} Returns the SHA-1 hash of the string argument as a string of 40 hexadecimal digits; null if string is null.

<tr>
  <td>
    {% highlight text %}

SHA256(string) {% endhighlight %} Returns the SHA-256 hash of the string argument as a string of 64 hexadecimal digits; null if string is null.

Unsupported Functions

The following functions are not supported yet:

  • Binary string operators and functions
  • System functions

{% top %}

Reserved Keywords

Although not every SQL feature is implemented yet, some string combinations are already reserved as keywords for future use. If you want to use one of the following strings as a field name, make sure to surround them with backticks (e.g. `value`, `count`).

{% highlight sql %}

A, ABS, ABSOLUTE, ACTION, ADA, ADD, ADMIN, AFTER, ALL, ALLOCATE, ALLOW, ALTER, ALWAYS, AND, ANY, ARE, ARRAY, AS, ASC, ASENSITIVE, ASSERTION, ASSIGNMENT, ASYMMETRIC, AT, ATOMIC, ATTRIBUTE, ATTRIBUTES, AUTHORIZATION, AVG, BEFORE, BEGIN, BERNOULLI, BETWEEN, BIGINT, BINARY, BIT, BLOB, BOOLEAN, BOTH, BREADTH, BY, C, CALL, CALLED, CARDINALITY, CASCADE, CASCADED, CASE, CAST, CATALOG, CATALOG_NAME, CEIL, CEILING, CENTURY, CHAIN, CHAR, CHARACTER, CHARACTERISTICTS, CHARACTERS, CHARACTER_LENGTH, CHARACTER_SET_CATALOG, CHARACTER_SET_NAME, CHARACTER_SET_SCHEMA, CHAR_LENGTH, CHECK, CLASS_ORIGIN, CLOB, CLOSE, COALESCE, COBOL, COLLATE, COLLATION, COLLATION_CATALOG, COLLATION_NAME, COLLATION_SCHEMA, COLLECT, COLUMN, COLUMN_NAME, COMMAND_FUNCTION, COMMAND_FUNCTION_CODE, COMMIT, COMMITTED, CONDITION, CONDITION_NUMBER, CONNECT, CONNECTION, CONNECTION_NAME, CONSTRAINT, CONSTRAINTS, CONSTRAINT_CATALOG, CONSTRAINT_NAME, CONSTRAINT_SCHEMA, CONSTRUCTOR, CONTAINS, CONTINUE, CONVERT, CORR, CORRESPONDING, COUNT, COVAR_POP, COVAR_SAMP, CREATE, CROSS, CUBE, CUME_DIST, CURRENT, CURRENT_CATALOG, CURRENT_DATE, CURRENT_DEFAULT_TRANSFORM_GROUP, CURRENT_PATH, CURRENT_ROLE, CURRENT_SCHEMA, CURRENT_TIME, CURRENT_TIMESTAMP, CURRENT_TRANSFORM_GROUP_FOR_TYPE, CURRENT_USER, CURSOR, CURSOR_NAME, CYCLE, DATA, DATABASE, DATE, DATETIME_INTERVAL_CODE, DATETIME_INTERVAL_PRECISION, DAY, DEALLOCATE, DEC, DECADE, DECIMAL, DECLARE, DEFAULT, DEFAULTS, DEFERRABLE, DEFERRED, DEFINED, DEFINER, DEGREE, DELETE, DENSE_RANK, DEPTH, DEREF, DERIVED, DESC, DESCRIBE, DESCRIPTION, DESCRIPTOR, DETERMINISTIC, DIAGNOSTICS, DISALLOW, DISCONNECT, DISPATCH, DISTINCT, DOMAIN, DOUBLE, DOW, DOY, DROP, DYNAMIC, DYNAMIC_FUNCTION, DYNAMIC_FUNCTION_CODE, EACH, ELEMENT, ELSE, END, END-EXEC, EPOCH, EQUALS, ESCAPE, EVERY, EXCEPT, EXCEPTION, EXCLUDE, EXCLUDING, EXEC, EXECUTE, EXISTS, EXP, EXPLAIN, EXTEND, EXTERNAL, EXTRACT, FALSE, FETCH, FILTER, FINAL, FIRST, FIRST_VALUE, FLOAT, FLOOR, FOLLOWING, FOR, FOREIGN, FORTRAN, FOUND, FRAC_SECOND, FREE, FROM, FULL, FUNCTION, FUSION, G, GENERAL, GENERATED, GET, GLOBAL, GO, GOTO, GRANT, GRANTED, GROUP, GROUPING, HAVING, HIERARCHY, HOLD, HOUR, IDENTITY, IMMEDIATE, IMPLEMENTATION, IMPORT, IN, INCLUDING, INCREMENT, INDICATOR, INITIALLY, INNER, INOUT, INPUT, INSENSITIVE, INSERT, INSTANCE, INSTANTIABLE, INT, INTEGER, INTERSECT, INTERSECTION, INTERVAL, INTO, INVOKER, IS, ISOLATION, JAVA, JOIN, K, KEY, KEY_MEMBER, KEY_TYPE, LABEL, LANGUAGE, LARGE, LAST, LAST_VALUE, LATERAL, LEADING, LEFT, LENGTH, LEVEL, LIBRARY, LIKE, LIMIT, LN, LOCAL, LOCALTIME, LOCALTIMESTAMP, LOCATOR, LOWER, M, MAP, MATCH, MATCHED, MAX, MAXVALUE, MEMBER, MERGE, MESSAGE_LENGTH, MESSAGE_OCTET_LENGTH, MESSAGE_TEXT, METHOD, MICROSECOND, MILLENNIUM, MIN, MINUTE, MINVALUE, MOD, MODIFIES, MODULE, MONTH, MORE, MULTISET, MUMPS, NAME, NAMES, NATIONAL, NATURAL, NCHAR, NCLOB, NESTING, NEW, NEXT, NO, NONE, NORMALIZE, NORMALIZED, NOT, NULL, NULLABLE, NULLIF, NULLS, NUMBER, NUMERIC, OBJECT, OCTETS, OCTET_LENGTH, OF, OFFSET, OLD, ON, ONLY, OPEN, OPTION, OPTIONS, OR, ORDER, ORDERING, ORDINALITY, OTHERS, OUT, OUTER, OUTPUT, OVER, OVERLAPS, OVERLAY, OVERRIDING, PAD, PARAMETER, PARAMETER_MODE, PARAMETER_NAME, PARAMETER_ORDINAL_POSITION, PARAMETER_SPECIFIC_CATALOG, PARAMETER_SPECIFIC_NAME, PARAMETER_SPECIFIC_SCHEMA, PARTIAL, PARTITION, PASCAL, PASSTHROUGH, PATH, PERCENTILE_CONT, PERCENTILE_DISC, PERCENT_RANK, PLACING, PLAN, PLI, POSITION, POWER, PRECEDING, PRECISION, PREPARE, PRESERVE, PRIMARY, PRIOR, PRIVILEGES, PROCEDURE, PUBLIC, QUARTER, RANGE, RANK, READ, READS, REAL, RECURSIVE, REF, REFERENCES, REFERENCING, REGR_AVGX, REGR_AVGY, REGR_COUNT, REGR_INTERCEPT, REGR_R2, REGR_SLOPE, REGR_SXX, REGR_SXY, REGR_SYY, RELATIVE, RELEASE, REPEATABLE, RESET, RESTART, RESTRICT, RESULT, RETURN, RETURNED_CARDINALITY, RETURNED_LENGTH, RETURNED_OCTET_LENGTH, RETURNED_SQLSTATE, RETURNS, REVOKE, RIGHT, ROLE, ROLLBACK, ROLLUP, ROUTINE, ROUTINE_CATALOG, ROUTINE_NAME, ROUTINE_SCHEMA, ROW, ROWS, ROW_COUNT, ROW_NUMBER, SAVEPOINT, SCALE, SCHEMA, SCHEMA_NAME, SCOPE, SCOPE_CATALOGS, SCOPE_NAME, SCOPE_SCHEMA, SCROLL, SEARCH, SECOND, SECTION, SECURITY, SELECT, SELF, SENSITIVE, SEQUENCE, SERIALIZABLE, SERVER, SERVER_NAME, SESSION, SESSION_USER, SET, SETS, SIMILAR, SIMPLE, SIZE, SMALLINT, SOME, SOURCE, SPACE, SPECIFIC, SPECIFICTYPE, SPECIFIC_NAME, SQL, SQLEXCEPTION, SQLSTATE, SQLWARNING, SQL_TSI_DAY, SQL_TSI_FRAC_SECOND, SQL_TSI_HOUR, SQL_TSI_MICROSECOND, SQL_TSI_MINUTE, SQL_TSI_MONTH, SQL_TSI_QUARTER, SQL_TSI_SECOND, SQL_TSI_WEEK, SQL_TSI_YEAR, SQRT, START, STATE, STATEMENT, STATIC, STDDEV_POP, STDDEV_SAMP, STREAM, STRUCTURE, STYLE, SUBCLASS_ORIGIN, SUBMULTISET, SUBSTITUTE, SUBSTRING, SUM, SYMMETRIC, SYSTEM, SYSTEM_USER, TABLE, TABLESAMPLE, TABLE_NAME, TEMPORARY, THEN, TIES, TIME, TIMESTAMP, TIMESTAMPADD, TIMESTAMPDIFF, TIMEZONE_HOUR, TIMEZONE_MINUTE, TINYINT, TO, TOP_LEVEL_COUNT, TRAILING, TRANSACTION, TRANSACTIONS_ACTIVE, TRANSACTIONS_COMMITTED, TRANSACTIONS_ROLLED_BACK, TRANSFORM, TRANSFORMS, TRANSLATE, TRANSLATION, TREAT, TRIGGER, TRIGGER_CATALOG, TRIGGER_NAME, TRIGGER_SCHEMA, TRIM, TRUE, TYPE, UESCAPE, UNBOUNDED, UNCOMMITTED, UNDER, UNION, UNIQUE, UNKNOWN, UNNAMED, UNNEST, UPDATE, UPPER, UPSERT, USAGE, USER, USER_DEFINED_TYPE_CATALOG, USER_DEFINED_TYPE_CODE, USER_DEFINED_TYPE_NAME, USER_DEFINED_TYPE_SCHEMA, USING, VALUE, VALUES, VARBINARY, VARCHAR, VARYING, VAR_POP, VAR_SAMP, VERSION, VIEW, WEEK, WHEN, WHENEVER, WHERE, WIDTH_BUCKET, WINDOW, WITH, WITHIN, WITHOUT, WORK, WRAPPER, WRITE, XML, YEAR, ZONE

{% endhighlight %}

Date Format Specifier

{% top %}