title: “Table API - Relational Queries” is_beta: true

The Table API an experimental feature

Flink provides an API that allows specifying operations using SQL-like expressions. Instead of manipulating DataSet or DataStream you work with Table on which relational operations can be performed.

The following dependency must be added to your project when using the Table API:

{% highlight xml %} org.apache.flink flink-table {{site.version }} {% endhighlight %}

Note that the Table API is currently not part of the binary distribution. See linking with it for cluster execution here.

Scala Table API

The Table API can be enabled by importing org.apache.flink.api.scala.table._. This enables implicit conversions that allow converting a DataSet or DataStream to a Table. This example shows how a DataSet can be converted, how relational queries can be specified and how a Table can be converted back to a DataSet:

{% highlight scala %} import org.apache.flink.api.scala._ import org.apache.flink.api.scala.table._

case class WC(word: String, count: Int) val input = env.fromElements(WC(“hello”, 1), WC(“hello”, 1), WC(“ciao”, 1)) val expr = input.toTable val result = expr.groupBy('word).select('word, 'count.sum as 'count).toDataSet[WC] {% endhighlight %}

The expression DSL uses Scala symbols to refer to field names and we use code generation to transform expressions to efficient runtime code. Please note that the conversion to and from Tables only works when using Scala case classes or Flink POJOs. Please check out the programming guide to learn the requirements for a class to be considered a POJO.

This is another example that shows how you can join to Tables:

{% highlight scala %} case class MyResult(a: String, d: Int)

val input1 = env.fromElements(...).toTable('a, 'b) val input2 = env.fromElements(...).toTable('c, 'd) val joined = input1.join(input2).where(“b = a && d > 42”).select(“a, d”).toDataSet[MyResult] {% endhighlight %}

Notice, how a DataSet can be converted to a Table by using as and specifying new names for the fields. This can also be used to disambiguate fields before a join operation. Also, in this example we see that you can also use Strings to specify relational expressions.

Please refer to the Scaladoc (and Javadoc) for a full list of supported operations and a description of the expression syntax.

Java Table API

When using Java, Tables can be converted to and from DataSet and DataStream using TableEnvironment. This example is equivalent to the above Scala Example:

{% highlight java %}

public class WC {

public WC(String word, int count) { this.word = word; this.count = count; }

public WC() {} // empty constructor to satisfy POJO requirements

public String word; public int count; }

...

ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment(); TableEnvironment tableEnv = new TableEnvironment();

DataSet input = env.fromElements( new WC(“Hello”, 1), new WC(“Ciao”, 1), new WC(“Hello”, 1));

Table table = tableEnv.fromDataSet(input);

Table filtered = table .groupBy(“word”) .select(“word.count as count, word”) .filter(“count = 2”);

DataSet result = tableEnv.toDataSet(filtered, WC.class); {% endhighlight %}

When using Java, the embedded DSL for specifying expressions cannot be used. Only String expressions are supported. They support exactly the same feature set as the expression DSL.

Please refer to the Javadoc for a full list of supported operations and a description of the expression syntax.