| --- |
| title: "Concepts & Common API" |
| nav-parent_id: tableapi |
| nav-pos: 0 |
| --- |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| |
| The Table API and SQL are integrated in a joint API. The central concept of this API is a `Table` which serves as input and output of queries. This document shows the common structure of programs with Table API and SQL queries, how to register a `Table`, how to query a `Table`, and how to emit a `Table`. |
| |
| * This will be replaced by the TOC |
| {:toc} |
| |
| Structure of Table API and SQL Programs |
| --------------------------------------- |
| |
| All Table API and SQL programs for batch and streaming follow the same pattern. The following code example shows the common structure of Table API and SQL programs. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| // for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| |
| // create a TableEnvironment |
| // for batch programs use BatchTableEnvironment instead of StreamTableEnvironment |
| StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); |
| |
| // register a Table |
| tableEnv.registerTable("table1", ...) // or |
| tableEnv.registerTableSource("table2", ...); // or |
| tableEnv.registerCatalog("extCat", ...); |
| |
| // create a Table from a Table API query |
| Table tapiResult = tableEnv.scan("table1").select(...); |
| // create a Table from a SQL query |
| Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... "); |
| |
| // emit a Table API result Table to a TableSink, same for SQL result |
| tapiResult.writeToSink(...); |
| |
| // execute |
| tableEnv.execute(); |
| |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| // for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment |
| val env = StreamExecutionEnvironment.getExecutionEnvironment |
| |
| // create a TableEnvironment |
| val tableEnv = TableEnvironment.getTableEnvironment(env) |
| |
| // register a Table |
| tableEnv.registerTable("table1", ...) // or |
| tableEnv.registerTableSource("table2", ...) // or |
| tableEnv.registerCatalog("extCat", ...) |
| |
| // create a Table from a Table API query |
| val tapiResult = tableEnv.scan("table1").select(...) |
| // Create a Table from a SQL query |
| val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...") |
| |
| // emit a Table API result Table to a TableSink, same for SQL result |
| tapiResult.writeToSink(...) |
| |
| // execute |
| tableEnv.execute() |
| |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| **Notes:** |
| 1. Table API and SQL queries can be easily integrated with and embedded into DataStream programs. Have a look at the [Integration with DataStream API](#integration-with-datastream-api) section to learn how DataStreams and DataSets can be converted into Tables and vice versa. |
| 2. For bounded stream job, we should always use `TableEnvironment.execute()` to submit and run the job. |
| |
| {% top %} |
| |
| Create a TableEnvironment |
| ------------------------- |
| |
| The `TableEnvironment` is a central concept of the Table API and SQL integration. It is responsible for: |
| |
| * Registering a `Table` in the internal catalog |
| * Registering a catalog |
| * Executing SQL queries |
| * Registering a user-defined (scalar, table, or aggregation) function |
| * Converting a `DataStream` into a `Table` |
| * Holding a reference to an `ExecutionEnvironment` or `StreamExecutionEnvironment` |
| |
| A `Table` is always bound to a specific `TableEnvironment`. It is not possible to combine tables of different TableEnvironments in the same query, e.g., to join or union them. |
| |
| A `TableEnvironment` is created by calling the static `TableEnvironment.getTableEnvironment()` method with a `StreamExecutionEnvironment` or an `ExecutionEnvironment` and an optional `TableConfig`. The `TableConfig` can be used to configure the `TableEnvironment` or to customize the query optimization and translation process (see [Query Optimization](#query-optimization)). |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| // *************** |
| // STREAMING QUERY |
| // *************** |
| StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); |
| // create a TableEnvironment for streaming queries |
| StreamTableEnvironment sTableEnv = TableEnvironment.getTableEnvironment(sEnv); |
| |
| // *********** |
| // BATCH QUERY |
| // *********** |
| StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); |
| // create a TableEnvironment for batch queries |
| BatchTableEnvironment bTableEnv = TableEnvironment.getBatchTableEnvironment(sEnv); |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| // *************** |
| // STREAMING QUERY |
| // *************** |
| val sEnv = StreamExecutionEnvironment.getExecutionEnvironment |
| // create a TableEnvironment for streaming queries |
| val sTableEnv = TableEnvironment.getTableEnvironment(sEnv) |
| |
| // *********** |
| // BATCH QUERY |
| // *********** |
| val sEnv = StreamExecutionEnvironment.getExecutionEnvironment |
| // create a TableEnvironment for batch queries |
| val bTableEnv = TableEnvironment.getBatchTableEnvironment(sEnv) |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| {% top %} |
| |
| Register Flink Tables in TableEnvironment |
| ----------------------------------- |
| |
| Tables registered via `TableEnvironment` will actually be registered to the default catalog and database in `CatalogManager`. Flink provides a built-in `FlinkInMemoryCatalog`, which implements `ReadableWritableCatalog`, as the default catalog. Users can also change the default catalog and database through both Table API or Flink SQL. |
| |
| Note that Flink tables may not be registered to all `ReadableWritableCatalog`. Currently Flink only supports registering tables in `FlinkInMemoryCatalog`. |
| |
| There are two types of Flink tables, *input tables* and *output tables*. Input tables can be referenced in Table API and SQL queries and provide input data. Output tables can be used to emit the result of a Table API or SQL query to an external system. |
| |
| An input table can be registered from various sources: |
| |
| * an existing `Table` object, usually the result of a Table API or SQL query. |
| * a `TableSource`, which accesses external data, such as a file, database, or messaging system. |
| * a `DataStream` from a DataStream program. Registering a `DataStream` is discussed in the [Integration with DataStream API](#integration-with-datastream-and-dataset-api) section. |
| |
| An output table can be registered using a `TableSink`. |
| |
| ### Register a Table |
| |
| A `Table` is registered in a `TableEnvironment` as follows: |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently |
| StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); |
| |
| // Table is the result of a simple projection query |
| Table projTable = tableEnv.scan("X").select(...); |
| |
| // register the Table projTable as table "projectedX" to the default catalog and database of CatalogManager |
| tableEnv.registerTable("projectedTable", projTable); |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| // get a TableEnvironment |
| val tableEnv = TableEnvironment.getTableEnvironment(env) |
| |
| // Table is the result of a simple projection query |
| val projTable: Table = tableEnv.scan("X").select(...) |
| |
| // register the Table projTable as table "projectedX" to the default catalog and database of CatalogManager |
| tableEnv.registerTable("projectedTable", projTable) |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| **Note:** A registered `Table` is treated similarly to a `VIEW` as known from relational database systems, i.e., the query that defines the `Table` is not optimized but will be inlined when another query references the registered `Table`. If multiple queries reference the same registered `Table`, it will be inlined for each referencing query and executed only once, i.e., the result of the registered `Table` will be shared. |
| |
| {% top %} |
| |
| ### Register a TableSource |
| |
| A `TableSource` provides access to external data which is stored in a storage system such as a database (MySQL, HBase, ...), a file with a specific encoding (CSV, Apache \[Parquet, Avro, ORC\], ...), or a messaging system (Apache Kafka, RabbitMQ, ...). |
| |
| Flink aims to provide TableSources for common data formats and storage systems. Please have a look at the [Table Sources and Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) page for a list of supported TableSources and instructions for how to build a custom `TableSource`. |
| |
| A `TableSource` is registered in a `TableEnvironment` as follows: |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently |
| StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); |
| |
| // create a TableSource |
| TableSource csvSource = new CsvTableSource("/path/to/file", ...); |
| |
| // register the TableSource as table "CsvTable" to the default catalog and database of CatalogManager |
| tableEnv.registerTableSource("CsvTable", csvSource); |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| // get a TableEnvironment |
| val tableEnv = TableEnvironment.getTableEnvironment(env) |
| |
| // create a TableSource |
| val csvSource: TableSource = new CsvTableSource("/path/to/file", ...) |
| |
| // register the TableSource as table "CsvTable" to the default catalog and database of CatalogManager |
| tableEnv.registerTableSource("CsvTable", csvSource) |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| {% top %} |
| |
| ### Register a TableSink |
| |
| A registered `TableSink` can be used to [emit the result of a Table API or SQL query](common.html#emit-a-table) to an external storage system, such as a database, key-value store, message queue, or file system (in different encodings, e.g., CSV, Apache \[Parquet, Avro, ORC\], ...). |
| |
| Flink aims to provide TableSinks for common data formats and storage systems. Please see the documentation about [Table Sources and Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) page for details about available sinks and instructions for how to implement a custom `TableSink`. |
| |
| A `TableSink` is registered in a `TableEnvironment` as follows: |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently |
| StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); |
| |
| // create a TableSink |
| TableSink csvSink = new CsvTableSink("/path/to/file", ...); |
| |
| // define the field names and types |
| String[] fieldNames = {"a", "b", "c"}; |
| DataType[] fieldTypes = {DataTypes.INT, DataTypes.STRING, DataTypes.LONG}; |
| |
| // register the TableSink as table "CsvSinkTable" to the default catalog and database of CatalogManager |
| tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink); |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| // get a TableEnvironment |
| val tableEnv = TableEnvironment.getTableEnvironment(env) |
| |
| // create a TableSink |
| val csvSink: TableSink = new CsvTableSink("/path/to/file", ...) |
| |
| // define the field names and types |
| val fieldNames: Array[String] = Array("a", "b", "c") |
| val fieldTypes: Array[DataType] = Array(DataTypes.INT, DataTypes.STRING, DataTypes.LONG) |
| |
| // register the TableSink as table "CsvSinkTable" to the default catalog and database of CatalogManager |
| tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink) |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| {% top %} |
| |
| Catalogs |
| ---------------------------- |
| |
| For catalogs, see [Catalog]({{ site.baseurl }}/dev/table/catalog.html) |
| |
| Query a Table |
| ------------- |
| |
| ### Table API |
| |
| The Table API is a language-integrated query API for Scala and Java. In contrast to SQL, queries are not specified as Strings but are composed step-by-step in the host language. |
| |
| The API is based on the `Table` class which represents a table (streaming or batch) and offers methods to apply relational operations. These methods return a new `Table` object, which represents the result of applying the relational operation on the input `Table`. Some relational operations are composed of multiple method calls such as `table.groupBy(...).select()`, where `groupBy(...)` specifies a grouping of `table`, and `select(...)` the projection on the grouping of `table`. |
| |
| The [Table API]({{ site.baseurl }}/dev/table/tableApi.html) document describes all Table API operations that are supported on streaming and batch tables. |
| |
| The following example shows a simple Table API aggregation query: |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently |
| StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); |
| |
| // register Orders table |
| |
| // scan registered Orders table |
| Table orders = tableEnv.scan("Orders"); |
| // compute revenue for all customers from France |
| Table revenue = orders |
| .filter("cCountry === 'FRANCE'") |
| .groupBy("cID, cName") |
| .select("cID, cName, revenue.sum AS revSum"); |
| |
| // emit or convert Table |
| // execute query |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| // get a TableEnvironment |
| val tableEnv = TableEnvironment.getTableEnvironment(env) |
| |
| // register Orders table |
| |
| // scan registered Orders table |
| val orders = tableEnv.scan("Orders") |
| // compute revenue for all customers from France |
| val revenue = orders |
| .filter('cCountry === "FRANCE") |
| .groupBy('cID, 'cName) |
| .select('cID, 'cName, 'revenue.sum AS 'revSum) |
| |
| // emit or convert Table |
| // execute query |
| {% endhighlight %} |
| |
| **Note:** The Scala Table API uses Scala Symbols, which start with a single tick (`'`) to reference the attributes of a `Table`. The Table API uses Scala implicits. Make sure to import `org.apache.flink.api.scala._` and `org.apache.flink.table.api.scala._` in order to use Scala implicit conversions. |
| </div> |
| </div> |
| |
| {% top %} |
| |
| ### SQL |
| |
| Flink's SQL integration is based on [Apache Calcite](https://calcite.apache.org), which implements the SQL standard. SQL queries are specified as regular Strings. |
| |
| The [SQL]({{ site.baseurl }}/dev/table/sql.html) document describes Flink's SQL support for streaming and batch tables. |
| |
| The following example shows how to specify a query and return the result as a `Table`. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently |
| StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); |
| |
| // register Orders table |
| |
| // compute revenue for all customers from France |
| Table revenue = tableEnv.sqlQuery( |
| "SELECT cID, cName, SUM(revenue) AS revSum " + |
| "FROM Orders " + |
| "WHERE cCountry = 'FRANCE' " + |
| "GROUP BY cID, cName" |
| ); |
| |
| // emit or convert Table |
| // execute query |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| // get a TableEnvironment |
| val tableEnv = TableEnvironment.getTableEnvironment(env) |
| |
| // register Orders table |
| |
| // compute revenue for all customers from France |
| val revenue = tableEnv.sqlQuery(""" |
| |SELECT cID, cName, SUM(revenue) AS revSum |
| |FROM Orders |
| |WHERE cCountry = 'FRANCE' |
| |GROUP BY cID, cName |
| """.stripMargin) |
| |
| // emit or convert Table |
| // execute query |
| {% endhighlight %} |
| |
| </div> |
| </div> |
| |
| The following example shows how to specify an update query that inserts its result into a registered table. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently |
| StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); |
| |
| // register "Orders" table |
| // register "RevenueFrance" output table |
| |
| // compute revenue for all customers from France and emit to "RevenueFrance" |
| tableEnv.sqlUpdate( |
| "INSERT INTO RevenueFrance " + |
| "SELECT cID, cName, SUM(revenue) AS revSum " + |
| "FROM Orders " + |
| "WHERE cCountry = 'FRANCE' " + |
| "GROUP BY cID, cName" |
| ); |
| |
| // execute query |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| // get a TableEnvironment |
| val tableEnv = TableEnvironment.getTableEnvironment(env) |
| |
| // register "Orders" table |
| // register "RevenueFrance" output table |
| |
| // compute revenue for all customers from France and emit to "RevenueFrance" |
| tableEnv.sqlUpdate(""" |
| |INSERT INTO RevenueFrance |
| |SELECT cID, cName, SUM(revenue) AS revSum |
| |FROM Orders |
| |WHERE cCountry = 'FRANCE' |
| |GROUP BY cID, cName |
| """.stripMargin) |
| |
| // execute query |
| {% endhighlight %} |
| |
| </div> |
| </div> |
| |
| {% top %} |
| |
| ### Mixing Table API and SQL |
| |
| Table API and SQL queries can be easily mixed because both return `Table` objects: |
| |
| * A Table API query can be defined on the `Table` object returned by a SQL query. |
| * A SQL query can be defined on the result of a Table API query by [registering the resulting Table](#register-a-table) in the `TableEnvironment` and referencing it in the `FROM` clause of the SQL query. |
| |
| {% top %} |
| |
| Emit a Table |
| ------------ |
| |
| A `Table` is emitted by writing it to a `TableSink`. A `TableSink` is a generic interface to support a wide variety of file formats (e.g. CSV, Apache Parquet, Apache Avro), storage systems (e.g., JDBC, Apache HBase, Apache Cassandra, Elasticsearch), or messaging systems (e.g., Apache Kafka, RabbitMQ). |
| |
| A batch `Table` can only be written to a `BatchTableSink`, while a streaming `Table` requires either an `AppendStreamTableSink`, a `RetractStreamTableSink`, or an `UpsertStreamTableSink`. |
| |
| Please see the documentation about [Table Sources & Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) for details about available sinks and instructions for how to implement a custom `TableSink`. |
| |
| There are two ways to emit a table: |
| |
| 1. The `Table.writeToSink(TableSink sink)` method emits the table using the provided `TableSink` and automatically configures the sink with the schema of the table to emit. |
| 2. The `Table.insertInto(String sinkTable)` method looks up a `TableSink` that was registered with a specific schema under the provided name in the `TableEnvironment`'s catalog. The schema of the table to emit is validated against the schema of the registered `TableSink`. |
| |
| The following examples shows how to emit a `Table`: |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently |
| StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); |
| |
| // compute a result Table using Table API operators and/or SQL queries |
| Table result = ... |
| |
| // create a TableSink |
| TableSink sink = new CsvTableSink("/path/to/file", "|"); |
| |
| // METHOD 1: |
| // Emit the result Table to the TableSink via the writeToSink() method |
| result.writeToSink(sink); |
| |
| // METHOD 2: |
| // Register the TableSink with a specific schema |
| String[] fieldNames = {"a", "b", "c"}; |
| DataType[] fieldTypes = {DataTypes.INT, DataTypes.STRING, DataTypes.LONG}; |
| tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink); |
| // Emit the result Table to the registered TableSink via the insertInto() method |
| result.insertInto("CsvSinkTable"); |
| |
| // execute the program |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| // get a TableEnvironment |
| val tableEnv = TableEnvironment.getTableEnvironment(env) |
| |
| // compute a result Table using Table API operators and/or SQL queries |
| val result: Table = ... |
| |
| // create a TableSink |
| val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|") |
| |
| // METHOD 1: |
| // Emit the result Table to the TableSink via the writeToSink() method |
| result.writeToSink(sink) |
| |
| // METHOD 2: |
| // Register the TableSink with a specific schema |
| val fieldNames: Array[String] = Array("a", "b", "c") |
| val fieldTypes: Array[DataType] = Array(DataTypes.INT, DataTypes.STRING, DataTypes.LONG) |
| tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink) |
| // Emit the result Table to the registered TableSink via the insertInto() method |
| result.insertInto("CsvSinkTable") |
| |
| // execute the program |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| {% top %} |
| |
| |
| Translate and Execute a Query |
| ----------------------------- |
| |
| Table API and SQL queries are translated into [DataStream]({{ site.baseurl }}/dev/datastream_api.html) programs no matter whether their input is a streaming or batch input. A query is internally represented as a logical query plan and is translated in two phases: |
| |
| 1. optimization of the logical plan, |
| 2. translation into a DataStream program. |
| |
| A Table API or SQL query is translated when: |
| |
| * a `Table` is emitted to a `TableSink`, i.e., when `Table.writeToSink()` or `Table.insertInto()` is called. |
| * a SQL update query is specified, i.e., when `TableEnvironment.sqlUpdate()` is called. |
| * a `Table` is converted into a `DataStream`(see [Integration with DataStream API](#integration-with-datastream-api)). |
| |
| Once translated, a Table API or SQL query is handled like a regular DataStream program and is executed when `StreamExecutionEnvironment.execute()` is called. |
| |
| {% top %} |
| |
| Integration with DataStream API |
| ------------------------------------------- |
| |
| Table API and SQL queries can be easily integrated with and embedded into [DataStream]({{ site.baseurl }}/dev/datastream_api.html) programs. For instance, it is possible to query an external table (for example from a RDBMS), do some pre-processing, such as filtering, projecting, aggregating, or joining with meta data, and then further process the data with either the DataStream API (and any of the libraries built on top of these APIs, such as CEP or Gelly). Inversely, a Table API or SQL query can also be applied on the result of a DataStream program. |
| |
| This interaction can be achieved by converting a `DataStream` into a `Table` and vice versa. In this section, we describe how these conversions are done. |
| |
| ### Implicit Conversion for Scala |
| |
| The Scala Table API features implicit conversions for the `DataStream`, and `Table` classes. These conversions are enabled by importing the package `org.apache.flink.table.api.scala._` in addition to `org.apache.flink.api.scala._` for the Scala DataStream API. |
| |
| ### Register a DataStream as Table |
| |
| A `DataStream` can be registered in a `TableEnvironment` as a Table. The schema of the resulting table depends on the data type of the registered `DataStream`. Please check the section about [mapping of data types to table schema](#mapping-of-data-types-to-table-schema) for details. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| // get StreamTableEnvironment |
| // registration of a DataStream in a BatchTableEnvironment is equivalent |
| StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); |
| |
| DataStream<Tuple2<Long, String>> stream = ... |
| |
| // register the DataStream as Table "myTable" with fields "f0", "f1" |
| tableEnv.registerDataStream("myTable", stream); |
| |
| // register the DataStream as table "myTable2" with fields "myLong", "myString" |
| tableEnv.registerDataStream("myTable2", stream, "myLong, myString"); |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| // get TableEnvironment |
| // registration of a bounded DataStream is equivalent |
| val tableEnv = TableEnvironment.getTableEnvironment(env) |
| |
| val stream: DataStream[(Long, String)] = ... |
| |
| // register the DataStream as Table "myTable" with fields "f0", "f1" |
| tableEnv.registerDataStream("myTable", stream) |
| |
| // register the DataStream as table "myTable2" with fields "myLong", "myString" |
| tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString) |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| **Note:** The name of a `DataStream` `Table` must not match the `^_DataStreamTable_[0-9]+` pattern and the name of a `DataSet` `Table` must not match the `^_DataSetTable_[0-9]+` pattern. These patterns are reserved for internal use only. |
| |
| {% top %} |
| |
| ### Convert a DataStream into a Table |
| |
| Instead of registering a `DataStream` into a `TableEnvironment`, it can also be directly converted into a `Table`. This is convenient if you want to use the Table in a Table API query. |
| |
| #### Convert a DataStream into a table(unbounded) |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| // get StreamTableEnvironment |
| StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); |
| |
| DataStream<Tuple2<Long, String>> stream = ... |
| |
| // Convert the DataStream into a Table with default fields "f0", "f1" |
| Table table1 = tableEnv.fromDataStream(stream); |
| |
| // Convert the DataStream into a Table with fields "myLong", "myString" |
| Table table2 = tableEnv.fromDataStream(stream, "myLong, myString"); |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| // get StreamTableEnvironment |
| val tableEnv = TableEnvironment.getTableEnvironment(env) |
| |
| val stream: DataStream[(Long, String)] = ... |
| |
| // convert the DataStream into a Table with default fields '_1, '_2 |
| val table1: Table = tableEnv.fromDataStream(stream) |
| |
| // convert the DataStream into a Table with fields 'myLong, 'myString |
| val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString) |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| #### Convert a DataStream into a table(bounded) |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| // get BatchTableEnvironment |
| BatchTableEnvironment tableEnv = TableEnvironment.getBatchTableEnvironment(env); |
| |
| DataStream<Tuple2<Long, String>> stream = ... |
| |
| // Convert the DataStream into a Table with default fields "f0", "f1" |
| Table table1 = tableEnv.fromBoundedStream(stream); |
| |
| // Convert the DataStream into a Table with fields "myLong", "myString" |
| Table table2 = tableEnv.fromBoundedStream(stream, "myLong, myString"); |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| // get BatchTableEnvironment |
| val tableEnv = TableEnvironment.getBatchTableEnvironment(env) |
| |
| val stream: DataStream[(Long, String)] = ... |
| |
| // convert the DataStream into a Table with default fields '_1, '_2 |
| val table1: Table = tableEnv.fromBoundedStream(stream) |
| |
| // convert the DataStream into a Table with fields 'myLong, 'myString |
| val table2: Table = tableEnv.fromBoundedStream(stream, 'myLong, 'myString) |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| {% top %} |
| |
| ### Convert a Table into a DataStream |
| |
| A `Table` can be converted into a `DataStream`. In this way, custom DataStream program can be run on the result of a Table API or SQL query. |
| |
| When converting a `Table` into a `DataStream`, you need to specify the data type of the resulting `DataStream`, i.e., the data type into which the rows of the `Table` are to be converted. Often the most convenient conversion type is `Row`. The following list gives an overview of the features of the different options: |
| |
| - **Row**: fields are mapped by position, arbitrary number of fields, support for `null` values, no type-safe access. |
| - **POJO**: fields are mapped by name (POJO fields must be named as `Table` fields), arbitrary number of fields, support for `null` values, type-safe access. |
| - **Case Class**: fields are mapped by position, no support for `null` values, type-safe access. |
| - **Tuple**: fields are mapped by position, limitation to 22 (Scala) or 25 (Java) fields, no support for `null` values, type-safe access. |
| - **Atomic Type**: `Table` must have a single field, no support for `null` values, type-safe access. |
| |
| #### Convert a Table into a DataStream(unbounded) |
| |
| A `Table` that is the result of a streaming query will be updated dynamically, i.e., it is changing as new records arrive on the query's input streams. Hence, the `DataStream` into which such a dynamic query is converted needs to encode the updates of the table. |
| |
| There are two modes to convert a `Table` into a `DataStream`: |
| |
| 1. **Append Mode**: This mode can only be used if the dynamic `Table` is only modified by `INSERT` changes, i.e, it is append-only and previously emitted results are never updated. |
| 2. **Retract Mode**: This mode can always be used. It encodes `INSERT` and `DELETE` changes with a `boolean` flag. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| // get StreamTableEnvironment. |
| StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); |
| |
| // Table with two fields (String name, Integer age) |
| Table table = ... |
| |
| // convert the Table into an append DataStream of Row by specifying the class |
| DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class); |
| |
| // convert the Table into an append DataStream of Tuple2<String, Integer> |
| // via a TypeInformation |
| TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>( |
| Types.STRING(), |
| Types.INT()); |
| DataStream<Tuple2<String, Integer>> dsTuple = |
| tableEnv.toAppendStream(table, tupleType); |
| |
| // convert the Table into a retract DataStream of Row. |
| // A retract stream of type X is a DataStream<Tuple2<Boolean, X>>. |
| // The boolean field indicates the type of the change. |
| // True is INSERT, false is DELETE. |
| DataStream<Tuple2<Boolean, Row>> retractStream = |
| tableEnv.toRetractStream(table, Row.class); |
| |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| // get TableEnvironment. |
| // registration of a bounded DataStream is equivalent |
| val tableEnv = TableEnvironment.getTableEnvironment(env) |
| |
| // Table with two fields (String name, Integer age) |
| val table: Table = ... |
| |
| // convert the Table into an append DataStream of Row |
| val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table) |
| |
| // convert the Table into an append DataStream of Tuple2[String, Int] |
| val dsTuple: DataStream[(String, Int)] dsTuple = |
| tableEnv.toAppendStream[(String, Int)](table) |
| |
| // convert the Table into a retract DataStream of Row. |
| // A retract stream of type X is a DataStream[(Boolean, X)]. |
| // The boolean field indicates the type of the change. |
| // True is INSERT, false is DELETE. |
| val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table) |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| **Note:** A detailed discussion about dynamic tables and their properties is given in the [Streaming Queries]({{ site.baseurl }}/dev/table/streaming.html) document. |
| |
| #### Convert a Table into a DataStream(bounded) |
| |
| A `Table` is converted into a bounded `DataStream` as follows: |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| // get BatchTableEnvironment |
| BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); |
| |
| // Table with two fields (String name, Integer age) |
| Table table = ... |
| |
| // convert the Table into a bounded DataStream by specifying a BatchTableSink. |
| DataStream<Row> dsRow = tableEnv.toBoundedStream(table, tSink); |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| // get TableEnvironment |
| // registration of a bounded DataStream is equivalent |
| val tableEnv = TableEnvironment.getTableEnvironment(env) |
| |
| // Table with two fields (String name, Integer age) |
| val table: Table = ... |
| val tableSink: BatchTableSink = ... |
| |
| // convert the Table into a bounded DataStream of Row |
| val dsRow: DataStream[Row] = tableEnv.toBoundedStream[Row](table, tableSink) |
| |
| // convert the Table into a bounded DataStream of Tuple2[String, Int] |
| val dsTuple: DataStream[(String, Int)] = tableEnv.toBoundedStream[(String, Int)](table, tableSink) |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| {% top %} |
| |
| ### Mapping of Data Types to Table Schema |
| |
| Flink's DataStream APIs support very diverse types. Composite types such as Tuples (built-in Scala and Flink Java tuples), POJOs, Scala case classes, and Flink's Row type allow for nested data structures with multiple fields that can be accessed in table expressions. Other types are treated as atomic types. In the following, we describe how the Table API converts these types into an internal row representation and show examples of converting a `DataStream` into a `Table`. |
| |
| The mapping of a data type to a table schema can happen in two ways: **based on the field positions** or **based on the field names**. |
| |
| **Position-based Mapping** |
| |
| Position-based mapping can be used to give fields a more meaningful name while keeping the field order. This mapping is available for composite data types *with a defined field order* as well as atomic types. Composite data types such as tuples, rows, and case classes have such a field order. However, fields of a POJO must be mapped based on the field names (see next section). |
| |
| When defining a position-based mapping, the specified names must not exist in the input data type, otherwise the API will assume that the mapping should happen based on the field names. If no field names are specified, the default field names and field order of the composite type are used or `f0` for atomic types. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently |
| StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); |
| |
| DataStream<Tuple2<Long, Integer>> stream = ... |
| |
| // convert DataStream into Table with default field names "f0" and "f1" |
| Table table = tableEnv.fromDataStream(stream); |
| |
| // convert DataStream into Table with field names "myLong" and "myInt" |
| Table table = tableEnv.fromDataStream(stream, "myLong, myInt"); |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| // get a TableEnvironment |
| val tableEnv = TableEnvironment.getTableEnvironment(env) |
| |
| val stream: DataStream[(Long, Int)] = ... |
| |
| // convert DataStream into Table with default field names "_1" and "_2" |
| val table: Table = tableEnv.fromDataStream(stream) |
| |
| // convert DataStream into Table with field names "myLong" and "myInt" |
| val table: Table = tableEnv.fromDataStream(stream, 'myLong 'myInt) |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| **Name-based Mapping** |
| |
| Name-based mapping can be used for any data type including POJOs. It is the most flexible way of defining a table schema mapping. All fields in the mapping are referenced by name and can be possibly renamed using an alias `as`. Fields can be reordered and projected out. |
| |
| If no field names are specified, the default field names and field order of the composite type are used or `f0` for atomic types. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently |
| StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); |
| |
| DataStream<Tuple2<Long, Integer>> stream = ... |
| |
| // convert DataStream into Table with default field names "f0" and "f1" |
| Table table = tableEnv.fromDataStream(stream); |
| |
| // convert DataStream into Table with field "f1" only |
| Table table = tableEnv.fromDataStream(stream, "f1"); |
| |
| // convert DataStream into Table with swapped fields |
| Table table = tableEnv.fromDataStream(stream, "f1, f0"); |
| |
| // convert DataStream into Table with swapped fields and field names "myInt" and "myLong" |
| Table table = tableEnv.fromDataStream(stream, "f1 as myInt, f0 as myLong"); |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| // get a TableEnvironment |
| val tableEnv = TableEnvironment.getTableEnvironment(env) |
| |
| val stream: DataStream[(Long, Int)] = ... |
| |
| // convert DataStream into Table with default field names "_1" and "_2" |
| val table: Table = tableEnv.fromDataStream(stream) |
| |
| // convert DataStream into Table with field "_2" only |
| val table: Table = tableEnv.fromDataStream(stream, '_2) |
| |
| // convert DataStream into Table with swapped fields |
| val table: Table = tableEnv.fromDataStream(stream, '_2, '_1) |
| |
| // convert DataStream into Table with swapped fields and field names "myInt" and "myLong" |
| val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myInt, '_1 as 'myLong) |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| #### Atomic Types |
| |
| Flink treats primitives (`Integer`, `Double`, `String`) or generic types (types that cannot be analyzed and decomposed) as atomic types. A `DataStream` of an atomic type is converted into a `Table` with a single attribute. The type of the attribute is inferred from the atomic type and the name of the attribute can be specified. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently |
| StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); |
| |
| DataStream<Long> stream = ... |
| |
| // convert DataStream into Table with default field name "f0" |
| Table table = tableEnv.fromDataStream(stream); |
| |
| // convert DataStream into Table with field name "myLong" |
| Table table = tableEnv.fromDataStream(stream, "myLong"); |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| // get a TableEnvironment |
| val tableEnv = TableEnvironment.getTableEnvironment(env) |
| |
| val stream: DataStream[Long] = ... |
| |
| // convert DataStream into Table with default field name "f0" |
| val table: Table = tableEnv.fromDataStream(stream) |
| |
| // convert DataStream into Table with field name "myLong" |
| val table: Table = tableEnv.fromDataStream(stream, 'myLong) |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| #### Tuples (Scala and Java) and Case Classes (Scala only) |
| |
| Flink supports Scala's built-in tuples and provides its own tuple classes for Java. DataStreams of both kinds of tuples can be converted into tables. Fields can be renamed by providing names for all fields (mapping based on position). If no field names are specified, the default field names are used. If the original field names (`f0`, `f1`, ... for Flink Tuples and `_1`, `_2`, ... for Scala Tuples) are referenced, the API assumes that the mapping is name-based instead of position-based. Name-based mapping allows for reordering fields and projection with alias (`as`). |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently |
| StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); |
| |
| DataStream<Tuple2<Long, String>> stream = ... |
| |
| // convert DataStream into Table with default field names "f0", "f1" |
| Table table = tableEnv.fromDataStream(stream); |
| |
| // convert DataStream into Table with renamed field names "myLong", "myString" (position-based) |
| Table table = tableEnv.fromDataStream(stream, "myLong, myString"); |
| |
| // convert DataStream into Table with reordered fields "f1", "f0" (name-based) |
| Table table = tableEnv.fromDataStream(stream, "f1, f0"); |
| |
| // convert DataStream into Table with projected field "f1" (name-based) |
| Table table = tableEnv.fromDataStream(stream, "f1"); |
| |
| // convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based) |
| Table table = tableEnv.fromDataStream(stream, "f1 as 'myString', f0 as 'myLong'"); |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| // get a TableEnvironment |
| val tableEnv = TableEnvironment.getTableEnvironment(env) |
| |
| val stream: DataStream[(Long, String)] = ... |
| |
| // convert DataStream into Table with renamed default field names '_1, '_2 |
| val table: Table = tableEnv.fromDataStream(stream) |
| |
| // convert DataStream into Table with field names "myLong", "myString" (position-based) |
| val table: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString) |
| |
| // convert DataStream into Table with reordered fields "_2", "_1" (name-based) |
| val table: Table = tableEnv.fromDataStream(stream, '_2, '_1) |
| |
| // convert DataStream into Table with projected field "_2" (name-based) |
| val table: Table = tableEnv.fromDataStream(stream, '_2) |
| |
| // convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based) |
| val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myString, '_1 as 'myLong) |
| |
| // define case class |
| case class Person(name: String, age: Int) |
| val streamCC: DataStream[Person] = ... |
| |
| // convert DataStream into Table with default field names 'name, 'age |
| val table = tableEnv.fromDataStream(streamCC) |
| |
| // convert DataStream into Table with field names 'myName, 'myAge (position-based) |
| val table = tableEnv.fromDataStream(streamCC, 'myName, 'myAge) |
| |
| // convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based) |
| val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName) |
| |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| #### POJO (Java and Scala) |
| |
| Flink supports POJOs as composite types. The rules for what determines a POJO are documented [here]({{ site.baseurl }}/dev/api_concepts.html#pojos). |
| |
| When converting a POJO `DataStream` into a `Table` without specifying field names, the names of the original POJO fields are used. The name mapping requires the original names and cannot be done by positions. Fields can be renamed using an alias (with the `as` keyword), reordered, and projected. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently |
| StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); |
| |
| // Person is a POJO with fields "name" and "age" |
| DataStream<Person> stream = ... |
| |
| // convert DataStream into Table with default field names "age", "name" (fields are ordered by name!) |
| Table table = tableEnv.fromDataStream(stream); |
| |
| // convert DataStream into Table with renamed fields "myAge", "myName" (name-based) |
| Table table = tableEnv.fromDataStream(stream, "age as myAge, name as myName"); |
| |
| // convert DataStream into Table with projected field "name" (name-based) |
| Table table = tableEnv.fromDataStream(stream, "name"); |
| |
| // convert DataStream into Table with projected and renamed field "myName" (name-based) |
| Table table = tableEnv.fromDataStream(stream, "name as myName"); |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| // get a TableEnvironment |
| val tableEnv = TableEnvironment.getTableEnvironment(env) |
| |
| // Person is a POJO with field names "name" and "age" |
| val stream: DataStream[Person] = ... |
| |
| // convert DataStream into Table with default field names "age", "name" (fields are ordered by name!) |
| val table: Table = tableEnv.fromDataStream(stream) |
| |
| // convert DataStream into Table with renamed fields "myAge", "myName" (name-based) |
| val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName) |
| |
| // convert DataStream into Table with projected field "name" (name-based) |
| val table: Table = tableEnv.fromDataStream(stream, 'name) |
| |
| // convert DataStream into Table with projected and renamed field "myName" (name-based) |
| val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName) |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| #### Row |
| |
| The `Row` data type supports an arbitrary number of fields and fields with `null` values. Field names can be specified via a `RowTypeInfo` or when converting a `Row` `DataStream` into a `Table`. The row type supports mapping of fields by position and by name. Fields can be renamed by providing names for all fields (mapping based on position) or selected individually for projection/ordering/renaming (mapping based on name). |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| // get a StreamTableEnvironment, works for BatchTableEnvironment equivalently |
| StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); |
| |
| // DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo` |
| DataStream<Row> stream = ... |
| |
| // convert DataStream into Table with default field names "name", "age" |
| Table table = tableEnv.fromDataStream(stream); |
| |
| // convert DataStream into Table with renamed field names "myName", "myAge" (position-based) |
| Table table = tableEnv.fromDataStream(stream, "myName, myAge"); |
| |
| // convert DataStream into Table with renamed fields "myName", "myAge" (name-based) |
| Table table = tableEnv.fromDataStream(stream, "name as myName, age as myAge"); |
| |
| // convert DataStream into Table with projected field "name" (name-based) |
| Table table = tableEnv.fromDataStream(stream, "name"); |
| |
| // convert DataStream into Table with projected and renamed field "myName" (name-based) |
| Table table = tableEnv.fromDataStream(stream, "name as myName"); |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| // get a TableEnvironment |
| val tableEnv = TableEnvironment.getTableEnvironment(env) |
| |
| // DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo` |
| val stream: DataStream[Row] = ... |
| |
| // convert DataStream into Table with default field names "name", "age" |
| val table: Table = tableEnv.fromDataStream(stream) |
| |
| // convert DataStream into Table with renamed field names "myName", "myAge" (position-based) |
| val table: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge) |
| |
| // convert DataStream into Table with renamed fields "myName", "myAge" (name-based) |
| val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge) |
| |
| // convert DataStream into Table with projected field "name" (name-based) |
| val table: Table = tableEnv.fromDataStream(stream, 'name) |
| |
| // convert DataStream into Table with projected and renamed field "myName" (name-based) |
| val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName) |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| {% top %} |
| |
| |
| Query Optimization |
| ------------------ |
| |
| The foundation of Apache Flink query optimization is Apache Calcite. In addition to apply Calcite in optimization, Flink also does a lot to enhance it. |
| |
| Fist of all, Flink does a series of rule-based optimization and cost-based optimization including: |
| * special subquery rewriting, including two part: 1. converts IN and EXISTS into left semi-join 2.converts NOT IN and NOT EXISTS into left anti-join. Note: only IN/EXISTS/NOT IN/NOT EXISTS in conjunctive condition is supported. |
| * normal subquery decorrelation based on Calcite |
| * projection pruning |
| * filter push down |
| * partition pruning |
| * join reorder if it is enabled (`sql.optimizer.join-reorder.enabled` is true) |
| * skew join optimization |
| * other kinds of query rewriting |
| |
| Secondly, Flink introduces rich statistics of data source and propagate those statistics up to the whole plan based on all kinds of extended `MetadataHandler`s. Optimizer could choose better plan based on those metadata. |
| |
| Finally, Flink provides fine-grain cost of each operator, which takes io, cpu, network and memory into account. Cost-based optimization could choose better plan based on fine-grain cost definition . |
| |
| It is possible to customize optimization programs referencing to `FlinkBatchPrograms`(default optimization programs for batch) or `FlinkStreamPrograms`(default optimization programs for stream), and replace the default optimization programs by providing a `CalciteConfig` object. This can be created via a builder by calling `CalciteConfig.createBuilder())` and is provided to the TableEnvironment by calling `tableEnv.getConfig.setCalciteConfig(calciteConfig)`. |
| |
| ### Reuse SubPlan |
| Flink will try to find duplicate sub-plans by the digest of physical sub-plan and reuse them if Reuse sub-plan is enabled (`sql.optimizer.reuse.sub-plan.enabled` is true, default is false). |
| |
| **Note:** Reuse sub-plan on Batch is supported now. |
| |
| The following code example shows the physical plan when reuse sub-plan is enabled. |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| // create a BatchTableEnvironment |
| BatchTableEnvironment tableEnv = TableEnvironment.getBatchTableEnvironment(env); |
| |
| // register Orders table |
| |
| // this part is reusable |
| Table table = tEnv.scan("Orders") |
| .groupBy("cID, cName") |
| .select("cID, cName, revenue.sum as revSum, revenue.avg as revAvg"); |
| |
| Table table1 = table.select("cID as cID1, cName as cName1, revSum as revSum1, revAvg as revAvg1"); |
| Table table2 = table.select("cID as cID2, cName as cName2, revSum as revSum2, revAvg as revAvg2"); |
| Table result = table1.join(table2, "revSum1 = revAvg2 && cID1 <> cID2"); |
| |
| // show plan with reuse info |
| String plan = tEnv.explain(result); |
| System.out.println(plan); |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| val env = StreamExecutionEnvironment.getExecutionEnvironment |
| // create a BatchTableEnvironment |
| val tEnv = TableEnvironment.getBatchTableEnvironment(env) |
| |
| val table = tEnv.scan("Orders") |
| .groupBy('cID, 'cName) |
| .select('cID, 'cName, 'revenue.sum as 'revSum, 'revenue.avg as 'revAvg) |
| |
| val table1 = table.select('cID as 'cID1, 'cName as 'cName1, 'revSum as 'revSum1, 'revAvg as 'revAvg1) |
| val table2 = table.select('cID as 'cID2, 'cName as 'cName2, 'revSum as 'revSum2, 'revAvg as 'revAvg2) |
| val result = table1.join(table2, "revSum1 = revAvg2 && cID1 <> cID2") |
| |
| // show plan with reuse info |
| val plan = tEnv.explain(result) |
| println(plan) |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| the `explain` result is as follows: (only show physical plan here, sub-plan of `SortAggregate` is reused) |
| {% highlight text %} |
| SortMergeJoin(where=[AND(=(revSum, revAvg0), <>(cID, cID0))], join=[cID, cName, revSum, revAvg, cID0, cName0, revSum0, revAvg0], joinType=[InnerJoin]) |
| :- Exchange(distribution=[hash[revSum]]) |
| : +- SortAggregate(isMerge=[false], groupBy=[cID, cName], select=[cID, cName, SUM(revenue) AS revSum, AVG(revenue) AS revAvg], reuse_id=[1]) |
| : +- Sort(orderBy=[cID ASC, cName ASC]) |
| : +- Exchange(distribution=[hash[cID, cName]]) |
| : +- TableSourceScan(table=[[builtin, default, Orders, source: [selectedFields=[cID, cName, revenue]]]], fields=[cID, cName, revenue]) |
| +- Exchange(distribution=[hash[revAvg]]) |
| +- Reused(reference_id=[1]) |
| {% endhighlight %} |
| |
| {% top %} |
| |
| ### Explaining a Table |
| |
| The Table API provides a mechanism to explain the logical and optimized query plans to compute a `Table`. |
| This is done through the `TableEnvironment.explain(table)` method. It returns a String describing three plans: |
| |
| 1. the Abstract Syntax Tree of the relational query, i.e., the unoptimized logical query plan, |
| 2. the optimized logical query plan, and |
| 3. the physical execution plan. |
| |
| The following code shows an example and the corresponding output: |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); |
| |
| DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello")); |
| DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello")); |
| |
| Table table1 = tEnv.fromDataStream(stream1, "count, word"); |
| Table table2 = tEnv.fromDataStream(stream2, "count, word"); |
| Table table = table1 |
| .where("LIKE(word, 'F%')") |
| .unionAll(table2); |
| |
| String explanation = tEnv.explain(table); |
| System.out.println(explanation); |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| val env = StreamExecutionEnvironment.getExecutionEnvironment |
| val tEnv = TableEnvironment.getTableEnvironment(env) |
| |
| val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) |
| val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word) |
| val table = table1 |
| .where('word.like("F%")) |
| .unionAll(table2) |
| |
| val explanation: String = tEnv.explain(table) |
| println(explanation) |
| {% endhighlight %} |
| </div> |
| </div> |
| |
| {% highlight text %} |
| == Abstract Syntax Tree == |
| LogicalUnion(all=[true]) |
| LogicalFilter(condition=[LIKE($1, 'F%')]) |
| LogicalTableScan(table=[[builtin, default, _DataStreamTable_0]]) |
| LogicalTableScan(table=[[builtin, default, _DataStreamTable_1]]) |
| |
| == Optimized Logical Plan == |
| DataStreamUnion(union=[count, word]) |
| DataStreamCalc(select=[count, word], where=[LIKE(word, 'F%')]) |
| DataStreamScan(table=[[builtin, default, _DataStreamTable_0]]) |
| DataStreamScan(table=[[builtin, default, _DataStreamTable_1]]) |
| |
| == Physical Execution Plan == |
| Stage 1 : Data Source |
| content : collect elements with CollectionInputFormat |
| |
| Stage 2 : Data Source |
| content : collect elements with CollectionInputFormat |
| |
| Stage 3 : Operator |
| content : from: (count, word) |
| ship_strategy : REBALANCE |
| |
| Stage 4 : Operator |
| content : where: (LIKE(word, 'F%')), select: (count, word) |
| ship_strategy : FORWARD |
| |
| Stage 5 : Operator |
| content : from: (count, word) |
| ship_strategy : REBALANCE |
| {% endhighlight %} |
| |
| ### Alter Table Statistics |
| |
| The Table API provides a mechanism to fetch or modify a `Table` statistics which we represent as a struct as `FlinkStatistic`. |
| The `FlinkStatistic` contains information of a flink `Table` as below: |
| |
| 1. The columns which can be seen as unique keys. |
| 2. Statistics of skewed column names and values. |
| 3. Column monotonicity |
| 4. `TableStats` which contains a `ColumnStats` of each column. |
| |
| The `ColumnStats` contains information of a `Table` column as below: |
| |
| 1. The number of distinct values(NDV). |
| 2. Null values count. |
| 3. Average length of the column values. |
| 4. Max length of the column values. |
| 5. Max value of the column values. |
| 5. Min values of the column values. |
| |
| This statistics is very important in query optimization, Flink will try to choose the best query plan based on these statistics. |
| |
| This following code shows how to interact with the `FlinkStatistic`: |
| |
| <div class="codetabs" markdown="1"> |
| <div data-lang="java" markdown="1"> |
| {% highlight java %} |
| StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); |
| |
| // Get table stats |
| String tablePath = "catalog1.database1.table1"; |
| TableStats tableStats = tEnv.getTableStats(tablePath); |
| |
| // modify table stats |
| TableStats tableStats1 = ... |
| Array<String> tablePath1 = ... |
| tEnv.alterTableStats(tablePath1, tableStats1); |
| |
| {% endhighlight %} |
| </div> |
| |
| <div data-lang="scala" markdown="1"> |
| {% highlight scala %} |
| val env = StreamExecutionEnvironment.getExecutionEnvironment(); |
| val tEnv = TableEnvironment.getTableEnvironment(env); |
| |
| // Get table stats |
| val tablePath = "catalog1.database1.table1" |
| val tableStats = tEnv.getTableStats(tablePath) |
| |
| // modify table stats |
| val tableStats1 = ... |
| val tablePath1 = ... |
| tEnv.alterTableStats(tablePath1, tableStats1) |
| |
| {% endhighlight %} |
| </div> |
| </div> |
| {% top %} |
| |
| |