Submit SQL Query via SQL Client

The following example programs showcase different applications of Flink SQL from simple word counting to pv-uv statistics via SQL Client and running on local standalone cluster.

Start local cluster first:

{% highlight bash %} $ ./bin/start-cluster.sh {% endhighlight %}

Check the web at http://localhost:8081 and make sure everything is up and running. The web frontend should report a single available TaskManager instance.

Note: Before running examples below, you need to make sure the local cluster is up and running.

Word Count

Prepare the input data:

{% highlight bash %} $ cat /tmp/input.csv hello flink hello sql hello world {% endhighlight %}

Then start SQL Client shell:

{% highlight bash %} $ ./bin/sql-client.sh embedded {% endhighlight %}

You can see the welcome message for flink sql client.

Paste the following sql ddl text into the shell. (For more information about sql ddl refer to [SQL]({{ site.baseurl }}/dev/table/sql.html) and [Supported DDL]({{ site.baseurl }}/dev/table/supported_ddl.html))

{% highlight bash %} create table csv_source ( a varchar ) with ( type = ‘csv’, path = ‘file:///tmp/input.csv’ ); {% endhighlight %}

Press ‘Enter’, then paste the following sql ddl text. {% highlight bash %} create table csv_sink ( a varchar, c bigint ) with ( type = ‘csv’, updatemode = ‘upsert’, path = ‘file:///tmp/output.csv’ ); {% endhighlight %}

Press ‘Enter’ and paste the following sql dml text. {% highlight bash %} insert into csv_sink select a, count(*) from csv_source group by a; {% endhighlight %}

After press ‘Enter’ the sql will be submitted to the standalone cluster. The log will print on the shell.

Open http://localhost:8081 and you can see the dashboard.

Click the job name: “default: insert into...”, and you can see the detailed info page:

And run the following command to see the output result:

{% highlight bash %} $ cat /tmp/output.csv {% endhighlight %}

Note that non-local file systems require a schema prefix, such as hdfs://.

PV-UV Statistics (Input: Kafka)

This example reads the access data of the website in the following format from kafka, and gather statistics of pv-uv in real time.

Start SQL Client shell:

{% highlight bash %} $ ./bin/sql-client.sh embedded {% endhighlight %}

You can see the welcome message for flink sql client.

Paste the following sql ddl text into the shell. (For more information about sql ddl refer to [SQL]({{ site.baseurl }}/dev/table/sql.html) and [Supported DDL]({{ site.baseurl }}/dev/table/supported_ddl.html))

Note: Replace the bootstrap.servers and group.id with your own environment.

{% highlight bash %} create table kafka_source ( messageKey varbinary, message varbinary, topic varchar, partition int, offset bigint ) with ( type = ‘kafka010’,
topic = ‘pvuv_demo’, bootstrap.servers = ‘YOUR_BROKER_IP:YOUR_BROKER_PORT’, group.id = ‘kafka_consumer_demo_group’ ); {% endhighlight %}

Press ‘Enter’, then paste the following sql dml text. {% highlight bash %} select date_format (visit_time, ‘yyyy-MM-dd HH:mm’) as visit_time, count (user_id) as pv, count (distinct user_id) as uv from ( select split_index (cast(message as varchar), ‘,’, 0) as visit_time, split_index (cast(message as varchar), ‘,’, 1) as user_id, split_index (cast(message as varchar), ‘,’, 2) as visit_page, split_index (cast(message as varchar), ‘,’, 3) as browser_type from kafka_source ) group by date_format (visit_time, ‘yyyy-MM-dd HH:mm’); {% endhighlight %}

After press ‘Enter’ the sql will be submitted to the standalone cluster. Since there is no data in kafka topic at this time, no output.

Then run kafka-console-producer.sh script under your local kafka installation package: Note: Replace the --broker-list with your own environment.

{% highlight bash %} $ ./bin/kafka-console-producer.sh --topic pvuv_demo --broker-list YOUR_BROKER_IP:YOUR_BROKER_PORT 1 >2018-10-16 09:00:00,1001,/page1,chrome 2 >2018-10-16 09:00:02,1001,/page2,safari 3 >2018-10-16 09:00:07,1005,/page1,safari 4 >2018-10-16 09:01:30,1001,/page1,chrome {% endhighlight %}

You can see output like this:

Open http://localhost:8081 and you can see the dashboard.

Click the job name: “default: select date_format...”, and you can see the detailed info page:

Animated demo of the example:

For more information please refer to [SQL]({{ site.baseurl }}/dev/table/sql.html) and [SQL Client]({{ site.baseurl }}/dev/table/sqlClient.html).

Submit SQL Query Programmatically

SQL queries can be submitted using the sqlQuery() method of the TableEnvironment programmatically.

StreamJoinSQLExample shows the usage of SQL Join on Stream Tables. It computes orders shipped within one hour. The algorithm works in two steps: First, join the Order table and the Shipment table on the orderId field. Second, filter the join result by createTime.

{% highlight java %} // set up the execution environment StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1); StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

DataStream order = env.fromElements( new Order(Timestamp.valueOf(“2018-10-15 09:01:20”), 2, 1, 7), new Order(Timestamp.valueOf(“2018-10-15 09:05:02”), 3, 2, 9), new Order(Timestamp.valueOf(“2018-10-15 09:05:02”), 1, 3, 9), new Order(Timestamp.valueOf(“2018-10-15 10:07:22”), 1, 4, 9), new Order(Timestamp.valueOf(“2018-10-15 10:55:01”), 5, 5, 8)); DataStream shipment = env.fromElements( new Shipment(Timestamp.valueOf(“2018-10-15 09:11:00”), 3), new Shipment(Timestamp.valueOf(“2018-10-15 10:01:21”), 1), new Shipment(Timestamp.valueOf(“2018-10-15 11:31:10”), 5));

// register the DataStreams under the name “t_order” and “t_shipment” tEnv.registerDataStream(“t_order”, order, “createTime, unit, orderId, productId”); tEnv.registerDataStream(“t_shipment”, shipment, “createTime, orderId”);

// run a SQL to get orders whose ship date are within one hour of the order date Table table = tEnv.sqlQuery( “SELECT o.createTime, o.productId, o.orderId, s.createTime AS shipTime” + " FROM t_order AS o" + " JOIN t_shipment AS s" + " ON o.orderId = s.orderId" + " AND s.createTime BETWEEN o.createTime AND o.createTime + INTERVAL ‘1’ HOUR");

DataStream resultDataStream = tEnv.toAppendStream(table, Row.class); resultDataStream.print();

// execute program env.execute(); {% endhighlight %}

The {% gh_link flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/StreamJoinSQLExample.java “StreamJoinSQLExample” %} implements the above described algorithm.

{% highlight scala %} // set up the execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env)

val t_order: DataStream[Order] = env.fromCollection(Seq( Order(Timestamp.valueOf(“2018-10-15 09:01:20”), 2, 1, 7), Order(Timestamp.valueOf(“2018-10-15 09:05:02”), 3, 2, 9), Order(Timestamp.valueOf(“2018-10-15 09:05:02”), 1, 3, 9), Order(Timestamp.valueOf(“2018-10-15 10:07:22”), 1, 4, 9), Order(Timestamp.valueOf(“2018-10-15 10:55:01”), 5, 5, 8)))

val t_shipment: DataStream[Shipment] = env.fromCollection(Seq( Shipment(Timestamp.valueOf(“2018-10-15 09:11:00”), 3), Shipment(Timestamp.valueOf(“2018-10-15 10:01:21”), 1), Shipment(Timestamp.valueOf(“2018-10-15 11:31:10”), 5)))

// register the DataStreams under the name “t_order” and “t_shipment” tEnv.registerDataStream(“t_order”, t_order, 'createTime, 'unit, 'orderId, 'productId) tEnv.registerDataStream(“t_shipment”, t_shipment, 'createTime, 'orderId)

// run a SQL to get orders whose ship date are within one hour of the order date val result = tEnv.sqlQuery( “SELECT o.createTime, o.productId, o.orderId, s.createTime AS shipTime” + " FROM t_order AS o" + " JOIN t_shipment AS s" + " ON o.orderId = s.orderId" + " AND s.createTime BETWEEN o.createTime AND o.createTime + INTERVAL ‘1’ HOUR")


// execute program env.execute()

// user-defined pojo case class Order(createTime: Timestamp, unit: Int, orderId: Long, productId: Long)

case class Shipment(createTime: Timestamp, orderId: Long)

{% endhighlight %}

The {% gh_link flink-examples/flink-examples-table/src/main/scala/org/apache/flink/table/examples/scala/StreamJoinSQLExample.scala “StreamJoinSQLExample.scala” %} implements the above described algorithm.

To run the StreamJoinSQLExample, issue the following command:

{% highlight bash %} $ ./bin/flink run ./examples/table/StreamJoinSQLExample.jar {% endhighlight %}

Open http://localhost:8081 and you can see the dashboard.

And run the following command to see the result:

{% highlight bash %} $ tail -f ./log/flink--taskexecutor.out {% endhighlight %}

