title: “Multiple TableSink Optimization” nav-parent_id: tableapi nav-pos: 110

Multiple TableSinks are needed if we want to emit multiple results to different external storage in a Flink job. It's better if we can define multiple TableSinks without having common operators executed repeatedly.

How to avoid executing common operators repeatedly if there are multiple TableSinks in a job.

The following example shows a Flink job which has multiple TableSinks.

// register Orders table

// compute revenue for all customers from France Table revenue = tableEnv.sqlQuery( "SELECT cID, cName, SUM(revenue) AS revSum " + "FROM Orders " + "WHERE cCountry = ‘FRANCE’ " + “GROUP BY cID, cName” ); // register a Table tEnv.registerTable(“T”, revenue);

// define first TableSink TableSink csvSink1 = new CsvTableSink(“/path/to/file1”, ...);

// compute customers with high purchasing ability from France Table result1 = tableEnv.sqlQuery( “SELECT * FROM T WHERE revSum >= 100000” ); // emit result1 to sink1 result1.writeToSink(csvSink1);

// define second TableSink TableSink csvSink2 = new CsvTableSink(“/path/to/file2”, ...);

// compute customers with good purchasing ability from France Table result2 = tableEnv.sqlQuery( “SELECT * FROM T WHERE revSum < 100000 AND revSum > 20000” ); // emit result2 to sink2 result2.writeToSink(csvSink2);

// execute query tEnv.execute(); {% endhighlight %}

// register Orders table

// compute revenue for all customers from France val revenue = tableEnv.sqlQuery( "SELECT cID, cName, SUM(revenue) AS revSum " + "FROM Orders " + "WHERE cCountry = ‘FRANCE’ " + “GROUP BY cID, cName” ) // register a Table tEnv.registerTable(“T”, revenue)

// define first TableSink val csvSink1 = new CsvTableSink(“/path/to/file1”, ...)

// compute customers with high purchasing ability from France val result1 = tableEnv.sqlQuery( “SELECT * FROM T WHERE revSum >= 100000” ) // emit result1 to sink1 result1.writeToSink(csvSink1)

// define second TableSink val csvSink2 = new CsvTableSink(“/path/to/file2”, ...)

// compute customers with good purchasing ability from France val result2 = tableEnv.sqlQuery( “SELECT * FROM T WHERE revSum < 100000 AND revSum > 20000” ) // emit result2 to sink2 result2.writeToSink(csvSink2)

// execute query tEnv.execute() {% endhighlight %}

Note: It's important to enable subsection optimization if there are multiple TableSinks in a job. In the above example, operators to compute revenue for all French customers will be reused if enable subsection optimization. The following picture shows difference between the job enable subsection optimization and the one not.

{% top %}