[FLINK-17599][docs] Update documents due to FLIP-84
diff --git a/docs/dev/table/catalogs.md b/docs/dev/table/catalogs.md
index 69b7e46..4ff3917 100644
--- a/docs/dev/table/catalogs.md
+++ b/docs/dev/table/catalogs.md
@@ -68,8 +68,6 @@
 
 Users can use SQL DDL to create tables in catalogs in both Table API and SQL.
 
-For Table API:
-
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
@@ -82,19 +80,36 @@
 tableEnv.registerCatalog("myhive", catalog);
 
 // Create a catalog database
-tableEnv.sqlUpdate("CREATE DATABASE mydb WITH (...)");
+tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");
 
 // Create a catalog table
-tableEnv.sqlUpdate("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
 
 tableEnv.listTables(); // should return the tables in current catalog and database.
 
 {% endhighlight %}
 </div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val tableEnv = ...
+
+// Create a HiveCatalog 
+val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>")
+
+// Register the catalog
+tableEnv.registerCatalog("myhive", catalog)
+
+// Create a catalog database
+tableEnv.executeSql("CREATE DATABASE mydb WITH (...)")
+
+// Create a catalog table
+tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)")
+
+tableEnv.listTables() // should return the tables in current catalog and database.
+
+{% endhighlight %}
 </div>
-
-For SQL Client:
-
+<div data-lang="SQL Client" markdown="1">
 {% highlight sql %}
 // the catalog should have been registered via yaml file
 Flink SQL> CREATE DATABASE mydb WITH (...);
@@ -104,17 +119,25 @@
 Flink SQL> SHOW TABLES;
 mytable
 {% endhighlight %}
+</div>
+</div>
+
 
 For detailed information, please check out [Flink SQL CREATE DDL]({{ site.baseurl }}/dev/table/sql/create.html).
 
-### Using Java/Scala/Python API
+### Using Java/Scala
 
-Users can use Java, Scala, or Python API to create catalog tables programmatically.
+Users can use Java or Scala to create catalog tables programmatically.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-TableEnvironment tableEnv = ...
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.catalog.*;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.descriptors.Kafka;
+
+TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
 
 // Create a HiveCatalog 
 Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");
@@ -123,7 +146,7 @@
 tableEnv.registerCatalog("myhive", catalog);
 
 // Create a catalog database 
-catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))
+catalog.createDatabase("mydb", new CatalogDatabaseImpl(...));
 
 // Create a catalog table
 TableSchema schema = TableSchema.builder()
@@ -138,15 +161,58 @@
             new Kafka()
                 .version("0.11")
                 ....
-                .startFromEarlist(),
+                .startFromEarlist()
+                .toProperties(),
             "my comment"
-        )
+        ),
+        false
     );
     
 List<String> tables = catalog.listTables("mydb"); // tables should contain "mytable"
 {% endhighlight %}
 
 </div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.table.api._
+import org.apache.flink.table.catalog._
+import org.apache.flink.table.catalog.hive.HiveCatalog
+import org.apache.flink.table.descriptors.Kafka
+
+val tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance.build)
+
+// Create a HiveCatalog 
+val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>")
+
+// Register the catalog
+tableEnv.registerCatalog("myhive", catalog)
+
+// Create a catalog database 
+catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))
+
+// Create a catalog table
+val schema = TableSchema.builder()
+    .field("name", DataTypes.STRING())
+    .field("age", DataTypes.INT())
+    .build()
+
+catalog.createTable(
+        new ObjectPath("mydb", "mytable"), 
+        new CatalogTableImpl(
+            schema,
+            new Kafka()
+                .version("0.11")
+                ....
+                .startFromEarlist()
+                .toProperties(),
+            "my comment"
+        ),
+        false
+    )
+    
+val tables = catalog.listTables("mydb") // tables should contain "mytable"
+{% endhighlight %}
+</div>
 </div>
 
 ## Catalog API
@@ -158,7 +224,7 @@
 ### Database operations
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 // create database
 catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);
@@ -184,7 +250,7 @@
 ### Table operations
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 // create table
 catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
@@ -213,7 +279,7 @@
 ### View operations
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 // create view
 catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);
@@ -243,7 +309,7 @@
 ### Partition operations
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 // create view
 catalog.createPartition(
@@ -284,7 +350,7 @@
 ### Function operations
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 // create function
 catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
diff --git a/docs/dev/table/catalogs.zh.md b/docs/dev/table/catalogs.zh.md
index fd3cafd..5bd8e56 100644
--- a/docs/dev/table/catalogs.zh.md
+++ b/docs/dev/table/catalogs.zh.md
@@ -64,8 +64,6 @@
 
 用户可以使用 DDL 通过 Table API 或者 SQL Client 在 Catalog 中创建表。
 
-使用 Table API:
-
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
@@ -78,19 +76,36 @@
 tableEnv.registerCatalog("myhive", catalog);
 
 // Create a catalog database
-tableEnv.sqlUpdate("CREATE DATABASE mydb WITH (...)");
+tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");
 
 // Create a catalog table
-tableEnv.sqlUpdate("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
 
 tableEnv.listTables(); // should return the tables in current catalog and database.
 
 {% endhighlight %}
 </div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val tableEnv = ...
+
+// Create a HiveCatalog 
+val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");
+
+// Register the catalog
+tableEnv.registerCatalog("myhive", catalog);
+
+// Create a catalog database
+tableEnv.executeSql("CREATE DATABASE mydb WITH (...)");
+
+// Create a catalog table
+tableEnv.executeSql("CREATE TABLE mytable (name STRING, age INT) WITH (...)");
+
+tableEnv.listTables(); // should return the tables in current catalog and database.
+
+{% endhighlight %}
 </div>
-
-使用 SQL Client:
-
+<div data-lang="SQL Client" markdown="1">
 {% highlight sql %}
 // the catalog should have been registered via yaml file
 Flink SQL> CREATE DATABASE mydb WITH (...);
@@ -100,17 +115,25 @@
 Flink SQL> SHOW TABLES;
 mytable
 {% endhighlight %}
+</div>
+</div>
+
 
 更多详细信息,请参考[Flink SQL CREATE DDL]({{ site.baseurl }}/zh/dev/table/sql/create.html)。
 
-### 使用 Java/Scala/Python API
+### 使用 Java/Scala
 
-用户可以用编程的方式使用Java、Scala 或者 Python API 来创建 Catalog 表。
+用户可以用编程的方式使用Java 或者 Scala 来创建 Catalog 表。
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-TableEnvironment tableEnv = ...
+import org.apache.flink.table.api.*;
+import org.apache.flink.table.catalog.*;
+import org.apache.flink.table.catalog.hive.HiveCatalog;
+import org.apache.flink.table.descriptors.Kafka;
+
+TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
 
 // Create a HiveCatalog
 Catalog catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>");
@@ -119,7 +142,7 @@
 tableEnv.registerCatalog("myhive", catalog);
 
 // Create a catalog database
-catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))
+catalog.createDatabase("mydb", new CatalogDatabaseImpl(...));
 
 // Create a catalog table
 TableSchema schema = TableSchema.builder()
@@ -134,15 +157,59 @@
             new Kafka()
                 .version("0.11")
                 ....
-                .startFromEarlist(),
+                .startFromEarlist()
+                .toProperties(),
             "my comment"
-        )
+        ),
+        false
     );
 
 List<String> tables = catalog.listTables("mydb"); // tables should contain "mytable"
 {% endhighlight %}
 
 </div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.table.api._
+import org.apache.flink.table.catalog._
+import org.apache.flink.table.catalog.hive.HiveCatalog
+import org.apache.flink.table.descriptors.Kafka
+
+val tableEnv = TableEnvironment.create(EnvironmentSettings.newInstance.build)
+
+// Create a HiveCatalog
+val catalog = new HiveCatalog("myhive", null, "<path_of_hive_conf>", "<hive_version>")
+
+// Register the catalog
+tableEnv.registerCatalog("myhive", catalog)
+
+// Create a catalog database
+catalog.createDatabase("mydb", new CatalogDatabaseImpl(...))
+
+// Create a catalog table
+val schema = TableSchema.builder()
+    .field("name", DataTypes.STRING())
+    .field("age", DataTypes.INT())
+    .build()
+
+catalog.createTable(
+        new ObjectPath("mydb", "mytable"),
+        new CatalogTableImpl(
+            schema,
+            new Kafka()
+                .version("0.11")
+                ....
+                .startFromEarlist()
+                .toProperties(),
+            "my comment"
+        ),
+        false
+    )
+
+val tables = catalog.listTables("mydb") // tables should contain "mytable"
+{% endhighlight %}
+
+</div>
 </div>
 
 ## Catalog API
@@ -154,7 +221,7 @@
 ### 数据库操作
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 // create database
 catalog.createDatabase("mydb", new CatalogDatabaseImpl(...), false);
@@ -180,7 +247,7 @@
 ### 表操作
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 // create table
 catalog.createTable(new ObjectPath("mydb", "mytable"), new CatalogTableImpl(...), false);
@@ -209,7 +276,7 @@
 ### 视图操作
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 // create view
 catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);
@@ -239,7 +306,7 @@
 ### 分区操作
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 // create view
 catalog.createPartition(
@@ -280,7 +347,7 @@
 ### 函数操作
 
 <div class="codetabs" markdown="1">
-<div data-lang="Java" markdown="1">
+<div data-lang="Java/Scala" markdown="1">
 {% highlight java %}
 // create function
 catalog.createFunction(new ObjectPath("mydb", "myfunc"), new CatalogFunctionImpl(...), false);
diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md
index a6fe84f..40bd4c8 100644
--- a/docs/dev/table/common.md
+++ b/docs/dev/table/common.md
@@ -35,7 +35,7 @@
 3. The implementations of `FilterableTableSource` for the old planner and the Blink planner are incompatible. The old planner will push down `PlannerExpression`s into `FilterableTableSource`, while the Blink planner will push down `Expression`s.
 4. String based key-value config options (Please see the documentation about [Configuration]({{ site.baseurl }}/dev/table/config.html) for details) are only used for the Blink planner.
 5. The implementation(`CalciteConfig`) of `PlannerConfig` in two planners is different.
-6. The Blink planner will optimize multiple-sinks into one DAG (supported only on `TableEnvironment`, not on `StreamTableEnvironment`). The old planner will always optimize each sink into a new DAG, where all DAGs are independent of each other.
+6. The Blink planner will optimize multiple-sinks into one DAG on both `TableEnvironment` and `StreamTableEnvironment`. The old planner will always optimize each sink into a new DAG, where all DAGs are independent of each other.
 7. The old planner does not support catalog statistics now, while the Blink planner does.
 
 
@@ -62,10 +62,8 @@
 Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
 
 // emit a Table API result Table to a TableSink, same for SQL result
-tapiResult.insertInto("outputTable");
-
-// execute
-tableEnv.execute("java_job");
+TableResult tableResult = tapiResult.executeInsert("outputTable");
+tableResult...
 
 {% endhighlight %}
 </div>
@@ -87,10 +85,8 @@
 val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ...")
 
 // emit a Table API result Table to a TableSink, same for SQL result
-tapiResult.insertInto("outputTable")
-
-// execute
-tableEnv.execute("scala_job")
+val tableResult = tapiResult.executeInsert("outputTable")
+tableResult...
 
 {% endhighlight %}
 </div>
@@ -113,10 +109,8 @@
 sql_result  = table_env.sql_query("SELECT ... FROM table1 ...")
 
 # emit a Table API result Table to a TableSink, same for SQL result
-tapi_result.insert_into("outputTable")
-
-# execute
-table_env.execute("python_job")
+table_result = tapi_result.execute_insert("outputTable")
+table_result...
 
 {% endhighlight %}
 </div>
@@ -425,7 +419,7 @@
 
 <div data-lang="DDL" markdown="1">
 {% highlight sql %}
-tableEnvironment.sqlUpdate("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")
+tableEnvironment.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")
 {% endhighlight %}
 </div>
 </div>
@@ -662,7 +656,7 @@
 // register "RevenueFrance" output table
 
 // compute revenue for all customers from France and emit to "RevenueFrance"
-tableEnv.sqlUpdate(
+tableEnv.executeSql(
     "INSERT INTO RevenueFrance " +
     "SELECT cID, cName, SUM(revenue) AS revSum " +
     "FROM Orders " +
@@ -670,7 +664,6 @@
     "GROUP BY cID, cName"
   );
 
-// execute query
 {% endhighlight %}
 </div>
 
@@ -683,7 +676,7 @@
 // register "RevenueFrance" output table
 
 // compute revenue for all customers from France and emit to "RevenueFrance"
-tableEnv.sqlUpdate("""
+tableEnv.executeSql("""
   |INSERT INTO RevenueFrance
   |SELECT cID, cName, SUM(revenue) AS revSum
   |FROM Orders
@@ -691,7 +684,6 @@
   |GROUP BY cID, cName
   """.stripMargin)
 
-// execute query
 {% endhighlight %}
 
 </div>
@@ -705,7 +697,7 @@
 # register "RevenueFrance" output table
 
 # compute revenue for all customers from France and emit to "RevenueFrance"
-table_env.sql_update(
+table_env.execute_sql(
     "INSERT INTO RevenueFrance "
     "SELECT cID, cName, SUM(revenue) AS revSum "
     "FROM Orders "
@@ -713,7 +705,6 @@
     "GROUP BY cID, cName"
 )
 
-# execute query
 {% endhighlight %}
 </div>
 </div>
@@ -738,7 +729,7 @@
 
 Please see the documentation about [Table Sources & Sinks]({{ site.baseurl }}/dev/table/sourceSinks.html) for details about available sinks and instructions for how to implement a custom `TableSink`.
 
-The `Table.insertInto(String tableName)` method emits the `Table` to a registered `TableSink`. The method looks up the `TableSink` from the catalog by the name and validates that the schema of the `Table` is identical to the schema of the `TableSink`. 
+The `Table.executeInsert(String tableName)` method emits the `Table` to a registered `TableSink`. The method looks up the `TableSink` from the catalog by the name and validates that the schema of the `Table` is identical to the schema of the `TableSink`. 
 
 The following examples shows how to emit a `Table`:
 
@@ -761,10 +752,10 @@
 
 // compute a result Table using Table API operators and/or SQL queries
 Table result = ...
-// emit the result Table to the registered TableSink
-result.insertInto("CsvSinkTable");
 
-// execute the program
+// emit the result Table to the registered TableSink
+result.executeInsert("CsvSinkTable");
+
 {% endhighlight %}
 </div>
 
@@ -788,9 +779,8 @@
 val result: Table = ...
 
 // emit the result Table to the registered TableSink
-result.insertInto("CsvSinkTable")
+result.executeInsert("CsvSinkTable")
 
-// execute the program
 {% endhighlight %}
 </div>
 
@@ -800,7 +790,7 @@
 table_env = ... # see "Create a TableEnvironment" section
 
 # create a TableSink
-t_env.connect(FileSystem().path("/path/to/file")))
+table_env.connect(FileSystem().path("/path/to/file")))
     .with_format(Csv()
                  .field_delimiter(',')
                  .deriveSchema())
@@ -814,9 +804,8 @@
 result = ...
 
 # emit the result Table to the registered TableSink
-result.insert_into("CsvSinkTable")
+result.execute_insert("CsvSinkTable")
 
-# execute the program
 {% endhighlight %}
 </div>
 </div>
@@ -839,8 +828,14 @@
 
 a Table API or SQL query is translated when:
 
-* `TableEnvironment.execute()` is called. A `Table` (emitted to a `TableSink` through `Table.insertInto()`) or a SQL update query (specified through `TableEnvironment.sqlUpdate()`) will be buffered in `TableEnvironment` first. All sinks will be optimized into one DAG.
+* `TableEnvironment.executeSql()` is called. This method is used for executing a given statement, and the sql query is translated immediately once this method is called.
+* `Table.executeInsert()` is called. This method is used for inserting the table content to the given sink path, and the Table API is translated immediately once this method is called.
+* `Table.execute()` is called. This method is used for collecting the table content to local client, and the Table API is translated immediately once this method is called.
+* `StatementSet.execute()` is called. A `Table` (emitted to a sink through `StatementSet.addInsert()`) or an INSERT statement (specified through `StatementSet.addInsertSql()`) will be buffered in `StatementSet` first. They are translated once `StatementSet.execute()` is called. All sinks will be optimized into one DAG.
 * A `Table` is translated when it is converted into a `DataStream` (see [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api)). Once translated, it's a regular DataStream program and is executed when `StreamExecutionEnvironment.execute()` is called.
+
+<span class="label label-danger">Attention</span> **Since 1.11 version, `sqlUpdate()` method and `insertInto()` method are deprecated. If the Table program is built from these two methods, we must use `StreamTableEnvironment.execute()` method instead of `StreamExecutionEnvironment.execute()` method to execute it.**
+
 </div>
 
 <div data-lang="Old planner" markdown="1">
@@ -849,18 +844,16 @@
 1. Optimization of the logical plan
 2. Translation into a DataStream or DataSet program
 
-For streaming, a Table API or SQL query is translated when:
+A Table API or SQL query is translated when:
 
-* `TableEnvironment.execute()` is called. A `Table` (emitted to a `TableSink` through `Table.insertInto()`) or a SQL update query (specified through `TableEnvironment.sqlUpdate()`) will be buffered in `TableEnvironment` first. Each sink will be optimized independently. The execution graph contains multiple independent sub-DAGs.
-* A `Table` is translated when it is converted into a `DataStream` (see [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api)). Once translated, it's a regular DataStream program and is executed when `StreamExecutionEnvironment.execute()` is called.
+* `TableEnvironment.executeSql()` is called. This method is used for executing a given statement, and the sql query is translated immediately once this method is called.
+* `Table.executeInsert()` is called. This method is used for inserting the table content to the given sink path, and the Table API is translated immediately once this method is called.
+* `Table.execute()` is called. This method is used for collecting the table content to local client, and the Table API is translated immediately once this method is called.
+* `StatementSet.execute()` is called. A `Table` (emitted to a sink through `StatementSet.addInsert()`) or an INSERT statement (specified through `StatementSet.addInsertSql()`) will be buffered in `StatementSet` first. They are translated once `StatementSet.execute()` is called. Each sink will be optimized independently. The execution graph contains multiple independent sub-DAGs.
+* For streaming, a `Table` is translated when it is converted into a `DataStream` (see [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api)). Once translated, it's a regular DataStream program and is executed when `StreamExecutionEnvironment.execute()` is called. For batch, a `Table` is translated when it is converted into a `DataSet` (see [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api)). Once translated, it's a regular DataSet program and is executed when `ExecutionEnvironment.execute()` is called.
 
-For batch, a Table API or SQL query is translated when:
+<span class="label label-danger">Attention</span> **Since 1.11 version, `sqlUpdate()` method and `insertInto()` method are deprecated. For streaming, if the Table program is built from these two methods, we must use `StreamTableEnvironment.execute()` method instead of `StreamExecutionEnvironment.execute()` method to execute it. For batch, if the Table program is built from these two methods, we must use `BatchTableEnvironment.execute()` method instead of `ExecutionEnvironment.execute()` method to execute it.**
 
-* a `Table` is emitted to a `TableSink`, i.e., when `Table.insertInto()` is called.
-* a SQL update query is specified, i.e., when `TableEnvironment.sqlUpdate()` is called.
-* a `Table` is converted into a `DataSet` (see [Integration with DataStream and DataSet API](#integration-with-datastream-and-dataset-api)).
-
-Once translated, a Table API or SQL query is handled like a regular DataSet program and is executed when `ExecutionEnvironment.execute()` is called.
 </div>
 
 </div>
@@ -1039,7 +1032,9 @@
 </div>
 </div>
 
-**Note:** A detailed discussion about dynamic tables and their properties is given in the [Dynamic Tables](streaming/dynamic_tables.html) document.
+**Note:** A detailed discussion about dynamic tables and their properties is given in the [Dynamic Tables](streaming/dynamic_tables.html) document. 
+
+<span class="label label-danger">Attention</span> **Once the Table is converted to a DataStream, please use the `StreamExecutionEnvironment.execute()` method to execute the DataStream program.**
 
 #### Convert a Table into a DataSet
 
@@ -1084,6 +1079,8 @@
 </div>
 </div>
 
+<span class="label label-danger">Attention</span> **Once the Table is converted to a DataSet, we must use the ExecutionEnvironment.execute method to execute the DataSet program.**
+
 {% top %}
 
 ### Mapping of Data Types to Table Schema
@@ -1435,16 +1432,17 @@
 </div>
 
 
-### Explaining a Table
+Explaining a Table
+------------------
 
 The Table API provides a mechanism to explain the logical and optimized query plans to compute a `Table`. 
-This is done through the `TableEnvironment.explain(table)` method or `TableEnvironment.explain()` method. `explain(table)` returns the plan of a given `Table`. `explain()` returns the result of a multiple sinks plan and is mainly used for the Blink planner. It returns a String describing three plans:
+This is done through the `Table.explain()` method or `StatementSet.explain()` method. `Table.explain()`returns the plan of a `Table`. `StatementSet.explain()` returns the plan of multiple sinks. It returns a String describing three plans:
 
 1. the Abstract Syntax Tree of the relational query, i.e., the unoptimized logical query plan,
 2. the optimized logical query plan, and
 3. the physical execution plan.
 
-The following code shows an example and the corresponding output for given `Table` using `explain(table)`:
+The following code shows an example and the corresponding output for given `Table` using `Table.explain()` method:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1455,14 +1453,14 @@
 DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
 DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
 
+// explain Table API
 Table table1 = tEnv.fromDataStream(stream1, $("count"), $("word"));
 Table table2 = tEnv.fromDataStream(stream2, $("count"), $("word"));
 Table table = table1
   .where($("word").like("F%"))
   .unionAll(table2);
+System.out.println(table.explain());
 
-String explanation = tEnv.explain(table);
-System.out.println(explanation);
 {% endhighlight %}
 </div>
 
@@ -1476,9 +1474,8 @@
 val table = table1
   .where($"word".like("F%"))
   .unionAll(table2)
+println(table.explain())
 
-val explanation: String = tEnv.explain(table)
-println(explanation)
 {% endhighlight %}
 </div>
 
@@ -1492,16 +1489,16 @@
 table = table1 \
     .where("LIKE(word, 'F%')") \
     .union_all(table2)
+print(table.explain())
 
-explanation = t_env.explain(table)
-print(explanation)
 {% endhighlight %}
 </div>
 </div>
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+The result of the above exmaple is
+<div>
 {% highlight text %}
+
 == Abstract Syntax Tree ==
 LogicalUnion(all=[true])
   LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
@@ -1532,97 +1529,11 @@
 			Stage 5 : Operator
 				content : from: (count, word)
 				ship_strategy : REBALANCE
+
 {% endhighlight %}
 </div>
 
-<div data-lang="scala" markdown="1">
-{% highlight text %}
-== Abstract Syntax Tree ==
-LogicalUnion(all=[true])
-  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
-    FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
-  FlinkLogicalDataStreamScan(id=[2], fields=[count, word])
-
-== Optimized Logical Plan ==
-DataStreamUnion(all=[true], union all=[count, word])
-  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
-    DataStreamScan(id=[1], fields=[count, word])
-  DataStreamScan(id=[2], fields=[count, word])
-
-== Physical Execution Plan ==
-Stage 1 : Data Source
-	content : collect elements with CollectionInputFormat
-
-Stage 2 : Data Source
-	content : collect elements with CollectionInputFormat
-
-	Stage 3 : Operator
-		content : from: (count, word)
-		ship_strategy : REBALANCE
-
-		Stage 4 : Operator
-			content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
-			ship_strategy : FORWARD
-
-			Stage 5 : Operator
-				content : from: (count, word)
-				ship_strategy : REBALANCE
-{% endhighlight %}
-</div>
-
-<div data-lang="python" markdown="1">
-{% highlight text %}
-== Abstract Syntax Tree ==
-LogicalUnion(all=[true])
-  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
-    FlinkLogicalDataStreamScan(id=[3], fields=[count, word])
-  FlinkLogicalDataStreamScan(id=[6], fields=[count, word])
-
-== Optimized Logical Plan ==
-DataStreamUnion(all=[true], union all=[count, word])
-  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
-    DataStreamScan(id=[3], fields=[count, word])
-  DataStreamScan(id=[6], fields=[count, word])
-
-== Physical Execution Plan ==
-Stage 1 : Data Source
-	content : collect elements with CollectionInputFormat
-
-	Stage 2 : Operator
-		content : Flat Map
-		ship_strategy : FORWARD
-
-		Stage 3 : Operator
-			content : Map
-			ship_strategy : FORWARD
-
-Stage 4 : Data Source
-	content : collect elements with CollectionInputFormat
-
-	Stage 5 : Operator
-		content : Flat Map
-		ship_strategy : FORWARD
-
-		Stage 6 : Operator
-			content : Map
-			ship_strategy : FORWARD
-
-			Stage 7 : Operator
-				content : Map
-				ship_strategy : FORWARD
-
-				Stage 8 : Operator
-					content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
-					ship_strategy : FORWARD
-
-					Stage 9 : Operator
-						content : Map
-						ship_strategy : FORWARD
-{% endhighlight %}
-</div>
-</div>
-
-The following code shows an example and the corresponding output for multiple-sinks plan using `explain()`:
+The following code shows an example and the corresponding output for multiple-sinks plan using `StatementSet.explain()` method:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1651,14 +1562,16 @@
     .withFormat(new Csv().deriveSchema())
     .withSchema(schema)
     .createTemporaryTable("MySink2");
+    
+StatementSet stmtSet = tEnv.createStatementSet();
 
 Table table1 = tEnv.from("MySource1").where($("word").like("F%"));
-table1.insertInto("MySink1");
+stmtSet.addInsert("MySink1", table1);
 
 Table table2 = table1.unionAll(tEnv.from("MySource2"));
-table2.insertInto("MySink2");
+stmtSet.addInsert("MySink2", table2);
 
-String explanation = tEnv.explain(false);
+String explanation = stmtSet.explain();
 System.out.println(explanation);
 
 {% endhighlight %}
@@ -1689,14 +1602,16 @@
     .withFormat(new Csv().deriveSchema())
     .withSchema(schema)
     .createTemporaryTable("MySink2")
+    
+val stmtSet = tEnv.createStatementSet()
 
 val table1 = tEnv.from("MySource1").where($"word".like("F%"))
-table1.insertInto("MySink1")
+stmtSet.addInsert("MySink1", table1)
 
 val table2 = table1.unionAll(tEnv.from("MySource2"))
-table2.insertInto("MySink2")
+stmtSet.addInsert("MySink2", table2)
 
-val explanation = tEnv.explain(false)
+val explanation = stmtSet.explain()
 println(explanation)
 
 {% endhighlight %}
@@ -1727,15 +1642,18 @@
     .with_format(Csv().deriveSchema())
     .with_schema(schema)
     .create_temporary_table("MySink2")
+    
+stmt_set = t_env.create_statement_set()
 
 table1 = t_env.from_path("MySource1").where("LIKE(word, 'F%')")
-table1.insert_into("MySink1")
+stmt_set.add_insert("MySink1", table1)
 
 table2 = table1.union_all(t_env.from_path("MySource2"))
-table2.insert_into("MySink2")
+stmt_set.add_insert("MySink2", table2)
 
-explanation = t_env.explain()
+explanation = stmt_set.explain()
 print(explanation)
+
 {% endhighlight %}
 </div>
 </div>
diff --git a/docs/dev/table/common.zh.md b/docs/dev/table/common.zh.md
index 7bb7954..f295f39 100644
--- a/docs/dev/table/common.zh.md
+++ b/docs/dev/table/common.zh.md
@@ -35,7 +35,7 @@
 3. 旧计划器和 Blink 计划器中 `FilterableTableSource` 的实现是不兼容的。旧计划器会将 `PlannerExpression` 下推至 `FilterableTableSource`,而 Blink 计划器则是将 `Expression` 下推。
 4. 基于字符串的键值配置选项仅在 Blink 计划器中使用。(详情参见 [配置]({{ site.baseurl }}/zh/dev/table/config.html) )
 5. `PlannerConfig` 在两种计划器中的实现(`CalciteConfig`)是不同的。
-6. Blink 计划器会将多sink(multiple-sinks)优化成一张有向无环图(DAG)(仅支持 `TableEnvironment`,不支持 `StreamTableEnvironment`)。旧计划器总是将每个sink都优化成一个新的有向无环图,且所有图相互独立。
+6. Blink 计划器会将多sink(multiple-sinks)优化成一张有向无环图(DAG),`TableEnvironment` 和 `StreamTableEnvironment` 都支持该特性。旧计划器总是将每个sink都优化成一个新的有向无环图,且所有图相互独立。
 7. 旧计划器目前不支持 catalog 统计数据,而 Blink 支持。
 
 
@@ -62,7 +62,8 @@
 Table sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ... ");
 
 // emit a Table API result Table to a TableSink, same for SQL result
-tapiResult.insertInto("outputTable");
+TableResult tableResult = tapiResult.executeInsert("outputTable");
+tableResult...
 
 // execute
 tableEnv.execute("java_job");
@@ -87,7 +88,8 @@
 val sqlResult  = tableEnv.sqlQuery("SELECT ... FROM table1 ...")
 
 // emit a Table API result Table to a TableSink, same for SQL result
-tapiResult.insertInto("outputTable")
+TableResult tableResult = tapiResult.executeInsert("outputTable");
+tableResult...
 
 // execute
 tableEnv.execute("scala_job")
@@ -113,7 +115,8 @@
 sql_result  = table_env.sql_query("SELECT ... FROM table1 ...")
 
 # emit a Table API result Table to a TableSink, same for SQL result
-tapi_result.insert_into("outputTable")
+table_result = tapi_result.execute_insert("outputTable")
+table_result...
 
 # execute
 table_env.execute("python_job")
@@ -405,7 +408,7 @@
 
 <div data-lang="DDL" markdown="1">
 {% highlight sql %}
-tableEnvironment.sqlUpdate("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")
+tableEnvironment.executeSql("CREATE [TEMPORARY] TABLE MyTable (...) WITH (...)")
 {% endhighlight %}
 </div>
 </div>
@@ -641,7 +644,7 @@
 // register "RevenueFrance" output table
 
 // compute revenue for all customers from France and emit to "RevenueFrance"
-tableEnv.sqlUpdate(
+tableEnv.executeSql(
     "INSERT INTO RevenueFrance " +
     "SELECT cID, cName, SUM(revenue) AS revSum " +
     "FROM Orders " +
@@ -649,7 +652,6 @@
     "GROUP BY cID, cName"
   );
 
-// execute query
 {% endhighlight %}
 </div>
 
@@ -662,7 +664,7 @@
 // register "RevenueFrance" output table
 
 // compute revenue for all customers from France and emit to "RevenueFrance"
-tableEnv.sqlUpdate("""
+tableEnv.executeSql("""
   |INSERT INTO RevenueFrance
   |SELECT cID, cName, SUM(revenue) AS revSum
   |FROM Orders
@@ -670,7 +672,6 @@
   |GROUP BY cID, cName
   """.stripMargin)
 
-// execute query
 {% endhighlight %}
 
 </div>
@@ -684,7 +685,7 @@
 # register "RevenueFrance" output table
 
 # compute revenue for all customers from France and emit to "RevenueFrance"
-table_env.sql_update(
+table_env.execute_sql(
     "INSERT INTO RevenueFrance "
     "SELECT cID, cName, SUM(revenue) AS revSum "
     "FROM Orders "
@@ -692,7 +693,6 @@
     "GROUP BY cID, cName"
 )
 
-# execute query
 {% endhighlight %}
 </div>
 </div>
@@ -717,7 +717,7 @@
 
 请参考文档 [Table Sources & Sinks]({{ site.baseurl }}/zh/dev/table/sourceSinks.html) 以获取更多关于可用 Sink 的信息以及如何自定义 `TableSink`。
 
-方法 `Table.insertInto(String tableName)` 将 `Table` 发送至已注册的 `TableSink`。该方法通过名称在 catalog 中查找 `TableSink` 并确认`Table` schema 和 `TableSink` schema 一致。
+方法 `Table.executeInsert(String tableName)` 将 `Table` 发送至已注册的 `TableSink`。该方法通过名称在 catalog 中查找 `TableSink` 并确认`Table` schema 和 `TableSink` schema 一致。
 
 下面的示例演示如何输出 `Table`:
 
@@ -741,9 +741,8 @@
 // compute a result Table using Table API operators and/or SQL queries
 Table result = ...
 // emit the result Table to the registered TableSink
-result.insertInto("CsvSinkTable");
+result.executeInsert("CsvSinkTable");
 
-// execute the program
 {% endhighlight %}
 </div>
 
@@ -767,9 +766,8 @@
 val result: Table = ...
 
 // emit the result Table to the registered TableSink
-result.insertInto("CsvSinkTable")
+result.executeInsert("CsvSinkTable")
 
-// execute the program
 {% endhighlight %}
 </div>
 
@@ -779,7 +777,7 @@
 table_env = ... # see "Create a TableEnvironment" section
 
 # create a TableSink
-t_env.connect(FileSystem().path("/path/to/file")))
+table_env.connect(FileSystem().path("/path/to/file")))
     .with_format(Csv()
                  .field_delimiter(',')
                  .deriveSchema())
@@ -793,9 +791,8 @@
 result = ...
 
 # emit the result Table to the registered TableSink
-result.insert_into("CsvSinkTable")
+result.execute_insert("CsvSinkTable")
 
-# execute the program
 {% endhighlight %}
 </div>
 </div>
@@ -818,8 +815,13 @@
 
 Table API 或者 SQL 查询在下列情况下会被翻译:
 
-* 当 `TableEnvironment.execute()` 被调用时。`Table` (通过 `Table.insertInto()` 输出给 `TableSink`)和 SQL (通过调用 `TableEnvironment.sqlUpdate()`)会先被缓存到 `TableEnvironment` 中,所有的 sink 会被优化成一张有向无环图。
-* `Table` 被转换成 `DataStream` 时(参阅[与 DataStream 和 DataSet API 结合](#integration-with-datastream-and-dataset-api))。转换完成后,它就成为一个普通的 DataStream 程序,并且会在调用 `StreamExecutionEnvironment.execute()` 的时候被执行。
+* 当 `TableEnvironment.executeSql()` 被调用时。该方法是用来执行一个 SQL 语句,一旦该方法被调用, SQL 语句立即被翻译。
+* 当 `Table.executeInsert()` 被调用时。该方法是用来将一个表的内容插入到目标表中,一旦该方法被调用, TABLE API 程序立即被翻译。
+* 当 `Table.execute()` 被调用时。该方法是用来将一个表的内容收集到本地,一旦该方法被调用, TABLE API 程序立即被翻译。
+* 当 `StatementSet.execute()` 被调用时。`Table` (通过 `StatementSet.addInsert()` 输出给某个 `Sink`)和 INSERT 语句 (通过调用 `StatementSet.addInsertSql()`)会先被缓存到 `StatementSet` 中,`StatementSet.execute()` 方法被调用时,所有的 sink 会被优化成一张有向无环图。
+* 当 `Table` 被转换成 `DataStream` 时(参阅[与 DataStream 和 DataSet API 结合](#integration-with-datastream-and-dataset-api))。转换完成后,它就成为一个普通的 DataStream 程序,并会在调用 `StreamExecutionEnvironment.execute()` 时被执行。
+
+<span class="label label-danger">注意</span> **从 1.11 版本开始,`sqlUpdate` 方法 和 `insertInto` 方法被废弃,从这两个方法构建的 Table 程序必须通过 `StreamTableEnvironment.execute()` 方法执行,而不能通过 `StreamExecutionEnvironment.execute()` 方法来执行。**
 </div>
 
 <div data-lang="Old planner" markdown="1">
@@ -828,18 +830,16 @@
 1. 优化逻辑执行计划
 2. 翻译成 DataStream 或 DataSet 程序
 
-对于 Streaming 而言,Table API 或者 SQL 查询在下列情况下会被翻译:
+Table API 或者 SQL 查询在下列情况下会被翻译:
 
-* 当 `TableEnvironment.execute()` 被调用时。`Table` (通过 `Table.insertInto()` 输出给 `TableSink`)和 SQL (通过调用 `TableEnvironment.sqlUpdate()`)会先被缓存到 `TableEnvironment` 中,每个 sink 会被单独优化。执行计划将包括多个独立的有向无环子图。
-* `Table` 被转换成 `DataStream` 时(参阅[与 DataStream 和 DataSet API 结合](#integration-with-datastream-and-dataset-api))。转换完成后,它就成为一个普通的 DataStream 程序,并且会在调用 `StreamExecutionEnvironment.execute()` 的时候被执行。
+* 当 `TableEnvironment.executeSql()` 被调用时。该方法是用来执行一个 SQL 语句,一旦该方法被调用, SQL 语句立即被翻译。
+* 当 `Table.executeInsert()` 被调用时。该方法是用来将一个表的内容插入到目标表中,一旦该方法被调用, TABLE API 程序立即被翻译。
+* 当 `Table.execute()` 被调用时。该方法是用来将一个表的内容收集到本地,一旦该方法被调用, TABLE API 程序立即被翻译。
+* 当 `StatementSet.execute()` 被调用时。`Table` (通过 `StatementSet.addInsert()` 输出给某个 `Sink`)和 INSERT 语句 (通过调用 `StatementSet.addInsertSql()`)会先被缓存到 `StatementSet` 中,`StatementSet.execute()` 方法被调用时,所有的 sink 会被优化成一张有向无环图。
+* 对于 Streaming 而言,当`Table` 被转换成 `DataStream` 时(参阅[与 DataStream 和 DataSet API 结合](#integration-with-datastream-and-dataset-api))触发翻译。转换完成后,它就成为一个普通的 DataStream 程序,并会在调用 `StreamExecutionEnvironment.execute()` 时被执行。对于 Batch 而言,`Table` 被转换成 `DataSet` 时(参阅[与 DataStream 和 DataSet API 结合](#integration-with-datastream-and-dataset-api))触发翻译。转换完成后,它就成为一个普通的 DataSet 程序,并会在调用 `ExecutionEnvironment.execute()` 时被执行。
 
-对于 Batch 而言,Table API 或者 SQL 查询在下列情况下会被翻译:
+<span class="label label-danger">注意</span> **从 1.11 版本开始,`sqlUpdate` 方法 和 `insertInto` 方法被废弃。对于 Streaming 而言,如果一个 Table 程序是从这两个方法构建出来的,必须通过 `StreamTableEnvironment.execute()` 方法执行,而不能通过 `StreamExecutionEnvironment.execute()` 方法执行;对于 Batch 而言,如果一个 Table 程序是从这两个方法构建出来的,必须通过 `BatchTableEnvironment.execute()` 方法执行,而不能通过 `ExecutionEnvironment.execute()` 方法执行。**
 
-* `Table` 被输出给 `TableSink`,即当调用 `Table.insertInto()` 时。
-* SQL 更新语句执行时,即,当调用 `TableEnvironment.sqlUpdate()` 时。
-* `Table` 被转换成 `DataSet` 时(参阅[与 DataStream 和 DataSet API 结合](#integration-with-datastream-and-dataset-api))。
-
-翻译完成后,Table API 或者 SQL 查询会被当做普通的 DataSet 程序对待并且会在调用 `ExecutionEnvironment.execute()` 的时候被执行。
 </div>
 
 </div>
@@ -1023,6 +1023,8 @@
 
 **注意:** 文档[动态表](streaming/dynamic_tables.html)给出了有关动态表及其属性的详细讨论。
 
+<span class="label label-danger">注意</span> **一旦 Table 被转化为 DataStream,必须使用 StreamExecutionEnvironment 的 execute 方法执行该 DataStream 作业。**
+
 #### 将表转换成 DataSet
 
 将 `Table` 转换成 `DataSet` 的过程如下:
@@ -1066,6 +1068,8 @@
 </div>
 </div>
 
+<span class="label label-danger">注意</span> **一旦 Table 被转化为 DataSet,必须使用 ExecutionEnvironment 的 execute 方法执行该 DataSet 作业。**
+
 {% top %}
 
 ### 数据类型到 Table Schema 的映射
@@ -1417,16 +1421,17 @@
 </div>
 
 
-### 解释表
+解释表
+------------------
 
 Table API 提供了一种机制来解释计算 `Table` 的逻辑和优化查询计划。
-这是通过 `TableEnvironment.explain(table)` 或者 `TableEnvironment.explain()` 完成的。`explain(table)` 返回给定 `Table` 的计划。 `explain()` 返回多 sink 计划的结果并且主要用于 Blink 计划器。它返回一个描述三种计划的字符串:
+这是通过 `Table.explain()` 方法或者 `StatementSet.explain()` 方法来完成的。`Table.explain()` 返回一个 Table 的计划。`StatementSet.explain()` 返回多 sink 计划的结果。它返回一个描述三种计划的字符串:
 
 1. 关系查询的抽象语法树(the Abstract Syntax Tree),即未优化的逻辑查询计划,
 2. 优化的逻辑查询计划,以及
 3. 物理执行计划。
 
-以下代码展示了一个示例以及对给定 `Table` 使用 `explain(table)` 的相应输出:
+以下代码展示了一个示例以及对给定 `Table` 使用 `Table.explain()` 方法的相应输出:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1437,14 +1442,14 @@
 DataStream<Tuple2<Integer, String>> stream1 = env.fromElements(new Tuple2<>(1, "hello"));
 DataStream<Tuple2<Integer, String>> stream2 = env.fromElements(new Tuple2<>(1, "hello"));
 
+// explain Table API
 Table table1 = tEnv.fromDataStream(stream1, $("count"), $("word"));
 Table table2 = tEnv.fromDataStream(stream2, $("count"), $("word"));
 Table table = table1
   .where($("word").like("F%"))
   .unionAll(table2);
+System.out.println(table.explain());
 
-String explanation = tEnv.explain(table);
-System.out.println(explanation);
 {% endhighlight %}
 </div>
 
@@ -1458,9 +1463,8 @@
 val table = table1
   .where($"word".like("F%"))
   .unionAll(table2)
+println(table.explain())
 
-val explanation: String = tEnv.explain(table)
-println(explanation)
 {% endhighlight %}
 </div>
 
@@ -1474,15 +1478,14 @@
 table = table1 \
     .where("LIKE(word, 'F%')") \
     .union_all(table2)
+print(table.explain())
 
-explanation = t_env.explain(table)
-print(explanation)
 {% endhighlight %}
 </div>
 </div>
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
+上述例子的结果是:
+<div>
 {% highlight text %}
 == Abstract Syntax Tree ==
 LogicalUnion(all=[true])
@@ -1517,94 +1520,7 @@
 {% endhighlight %}
 </div>
 
-<div data-lang="scala" markdown="1">
-{% highlight text %}
-== Abstract Syntax Tree ==
-LogicalUnion(all=[true])
-  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
-    FlinkLogicalDataStreamScan(id=[1], fields=[count, word])
-  FlinkLogicalDataStreamScan(id=[2], fields=[count, word])
-
-== Optimized Logical Plan ==
-DataStreamUnion(all=[true], union all=[count, word])
-  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
-    DataStreamScan(id=[1], fields=[count, word])
-  DataStreamScan(id=[2], fields=[count, word])
-
-== Physical Execution Plan ==
-Stage 1 : Data Source
-	content : collect elements with CollectionInputFormat
-
-Stage 2 : Data Source
-	content : collect elements with CollectionInputFormat
-
-	Stage 3 : Operator
-		content : from: (count, word)
-		ship_strategy : REBALANCE
-
-		Stage 4 : Operator
-			content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
-			ship_strategy : FORWARD
-
-			Stage 5 : Operator
-				content : from: (count, word)
-				ship_strategy : REBALANCE
-{% endhighlight %}
-</div>
-
-<div data-lang="python" markdown="1">
-{% highlight text %}
-== Abstract Syntax Tree ==
-LogicalUnion(all=[true])
-  LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
-    FlinkLogicalDataStreamScan(id=[3], fields=[count, word])
-  FlinkLogicalDataStreamScan(id=[6], fields=[count, word])
-
-== Optimized Logical Plan ==
-DataStreamUnion(all=[true], union all=[count, word])
-  DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')])
-    DataStreamScan(id=[3], fields=[count, word])
-  DataStreamScan(id=[6], fields=[count, word])
-
-== Physical Execution Plan ==
-Stage 1 : Data Source
-	content : collect elements with CollectionInputFormat
-
-	Stage 2 : Operator
-		content : Flat Map
-		ship_strategy : FORWARD
-
-		Stage 3 : Operator
-			content : Map
-			ship_strategy : FORWARD
-
-Stage 4 : Data Source
-	content : collect elements with CollectionInputFormat
-
-	Stage 5 : Operator
-		content : Flat Map
-		ship_strategy : FORWARD
-
-		Stage 6 : Operator
-			content : Map
-			ship_strategy : FORWARD
-
-			Stage 7 : Operator
-				content : Map
-				ship_strategy : FORWARD
-
-				Stage 8 : Operator
-					content : where: (LIKE(word, _UTF-16LE'F%')), select: (count, word)
-					ship_strategy : FORWARD
-
-					Stage 9 : Operator
-						content : Map
-						ship_strategy : FORWARD
-{% endhighlight %}
-</div>
-</div>
-
-以下代码展示了一个示例以及使用 `explain()` 的多 sink 计划的相应输出:
+以下代码展示了一个示例以及使用 `StatementSet.explain()` 的多 sink 计划的相应输出:
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1634,13 +1550,15 @@
     .withSchema(schema)
     .createTemporaryTable("MySink2");
 
+StatementSet stmtSet = tEnv.createStatementSet();
+
 Table table1 = tEnv.from("MySource1").where($("word").like("F%"));
-table1.insertInto("MySink1");
+stmtSet.addInsert("MySink1", table1);
 
 Table table2 = table1.unionAll(tEnv.from("MySource2"));
-table2.insertInto("MySink2");
+stmtSet.addInsert("MySink2", table2);
 
-String explanation = tEnv.explain(false);
+String explanation = stmtSet.explain();
 System.out.println(explanation);
 
 {% endhighlight %}
@@ -1672,13 +1590,15 @@
     .withSchema(schema)
     .createTemporaryTable("MySink2")
 
+val stmtSet = tEnv.createStatementSet()
+
 val table1 = tEnv.from("MySource1").where($"word".like("F%"))
-table1.insertInto("MySink1")
+stmtSet.addInsert("MySink1", table1)
 
 val table2 = table1.unionAll(tEnv.from("MySource2"))
-table2.insertInto("MySink2")
+stmtSet.addInsert("MySink2", table2)
 
-val explanation = tEnv.explain(false)
+val explanation = stmtSet.explain()
 println(explanation)
 
 {% endhighlight %}
@@ -1710,14 +1630,17 @@
     .with_schema(schema)
     .create_temporary_table("MySink2")
 
+stmt_set = t_env.create_statement_set()
+
 table1 = t_env.from_path("MySource1").where("LIKE(word, 'F%')")
-table1.insert_into("MySink1")
+stmt_set.add_insert("MySink1", table1)
 
 table2 = table1.union_all(t_env.from_path("MySource2"))
-table2.insert_into("MySink2")
+stmt_set.add_insert("MySink2", table2)
 
-explanation = t_env.explain()
+explanation = stmt_set.explain()
 print(explanation)
+
 {% endhighlight %}
 </div>
 </div>
diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index 0519752..7aa151b 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -91,7 +91,7 @@
 <div class="codetabs" markdown="1">
 <div data-lang="DDL" markdown="1">
 {% highlight sql %}
-tableEnvironment.sqlUpdate(
+tableEnvironment.executeSql(
     "CREATE TABLE MyTable (\n" +
     "  ...    -- declare table schema \n" +
     ") WITH (\n" +
@@ -2078,7 +2078,7 @@
   sink);
 
 Table table = ...
-table.insertInto("csvOutputTable");
+table.executeInsert("csvOutputTable");
 {% endhighlight %}
 </div>
 
@@ -2099,7 +2099,7 @@
   sink)
 
 val table: Table = ???
-table.insertInto("csvOutputTable")
+table.executeInsert("csvOutputTable")
 {% endhighlight %}
 </div>
 
diff --git a/docs/dev/table/connect.zh.md b/docs/dev/table/connect.zh.md
index 41d87a1..622e04f 100644
--- a/docs/dev/table/connect.zh.md
+++ b/docs/dev/table/connect.zh.md
@@ -91,7 +91,7 @@
 <div class="codetabs" markdown="1">
 <div data-lang="DDL" markdown="1">
 {% highlight sql %}
-tableEnvironment.sqlUpdate(
+tableEnvironment.executeSql(
     "CREATE TABLE MyTable (\n" +
     "  ...    -- declare table schema \n" +
     ") WITH (\n" +
@@ -2074,7 +2074,7 @@
   sink);
 
 Table table = ...
-table.insertInto("csvOutputTable");
+table.executeInsert("csvOutputTable");
 {% endhighlight %}
 </div>
 
@@ -2095,7 +2095,7 @@
   sink)
 
 val table: Table = ???
-table.insertInto("csvOutputTable")
+table.executeInsert("csvOutputTable")
 {% endhighlight %}
 </div>
 
diff --git a/docs/dev/table/sql/alter.md b/docs/dev/table/sql/alter.md
index 82ae467..61d9098 100644
--- a/docs/dev/table/sql/alter.md
+++ b/docs/dev/table/sql/alter.md
@@ -35,7 +35,7 @@
 
 ## Run an ALTER statement
 
-ALTER statements can be executed with the `sqlUpdate()` method of the `TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The `sqlUpdate()` method returns nothing for a successful ALTER operation, otherwise will throw an exception.
+ALTER statements can be executed with the `executeSql()` method of the `TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The `executeSql()` method returns 'OK' for a successful ALTER operation, otherwise will throw an exception.
 
 The following examples show how to run an ALTER statement in `TableEnvironment` and in SQL CLI.
 
@@ -46,16 +46,18 @@
 TableEnvironment tableEnv = TableEnvironment.create(settings);
 
 // register a table named "Orders"
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 
 // a string array: ["Orders"]
-String[] tables = tableEnv.listTable();
+String[] tables = tableEnv.listTables();
+// or tableEnv.executeSql("SHOW TABLES").print();
 
 // rename "Orders" to "NewOrders"
-tableEnv.sqlUpdate("ALTER TABLE Orders RENAME TO NewOrders;");
+tableEnv.executeSql("ALTER TABLE Orders RENAME TO NewOrders;");
 
 // a string array: ["NewOrders"]
-String[] tables = tableEnv.listTable();
+String[] tables = tableEnv.listTables();
+// or tableEnv.executeSql("SHOW TABLES").print();
 {% endhighlight %}
 </div>
 
@@ -65,32 +67,36 @@
 val tableEnv = TableEnvironment.create(settings)
 
 // register a table named "Orders"
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 
 // a string array: ["Orders"]
-val tables = tableEnv.listTable()
+val tables = tableEnv.listTables()
+// or tableEnv.executeSql("SHOW TABLES").print()
 
 // rename "Orders" to "NewOrders"
-tableEnv.sqlUpdate("ALTER TABLE Orders RENAME TO NewOrders;")
+tableEnv.executeSql("ALTER TABLE Orders RENAME TO NewOrders;")
 
 // a string array: ["NewOrders"]
-val tables = tableEnv.listTable()
+val tables = tableEnv.listTables()
+// or tableEnv.executeSql("SHOW TABLES").print()
 {% endhighlight %}
 </div>
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-settings = EnvironmentSettings.newInstance()...
-table_env = TableEnvironment.create(settings)
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
 
 # a string array: ["Orders"]
-tables = tableEnv.listTable()
+tables = table_env.list_tables()
+# or table_env.execute_sql("SHOW TABLES").print()
 
 # rename "Orders" to "NewOrders"
-tableEnv.sqlUpdate("ALTER TABLE Orders RENAME TO NewOrders;")
+table_env.execute_sql("ALTER TABLE Orders RENAME TO NewOrders;")
 
 # a string array: ["NewOrders"]
-tables = tableEnv.listTable()
+tables = table_env.list_tables()
+# or table_env.execute_sql("SHOW TABLES").print()
 {% endhighlight %}
 </div>
 
diff --git a/docs/dev/table/sql/alter.zh.md b/docs/dev/table/sql/alter.zh.md
index 16e4cda..7d3eed1 100644
--- a/docs/dev/table/sql/alter.zh.md
+++ b/docs/dev/table/sql/alter.zh.md
@@ -35,7 +35,7 @@
 
 ## 执行 ALTER 语句
 
-可以使用 `TableEnvironment` 中的 `sqlUpdate()` 方法执行 ALTER 语句,也可以在 [SQL CLI]({{ site.baseurl }}/zh/dev/table/sqlClient.html) 中执行 ALTER 语句。 若 ALTER 操作执行成功,`sqlUpdate()` 方法不返回任何内容,否则会抛出异常。
+可以使用 `TableEnvironment` 中的 `executeSql()` 方法执行 ALTER 语句,也可以在 [SQL CLI]({{ site.baseurl }}/zh/dev/table/sqlClient.html) 中执行 ALTER 语句。 若 ALTER 操作执行成功,`executeSql()` 方法返回 'OK',否则会抛出异常。
 
 以下的例子展示了如何在 `TableEnvironment` 和  SQL CLI 中执行一个 ALTER 语句。
 
@@ -46,16 +46,18 @@
 TableEnvironment tableEnv = TableEnvironment.create(settings);
 
 // 注册名为 “Orders” 的表
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 
 // 字符串数组: ["Orders"]
-String[] tables = tableEnv.listTable();
+String[] tables = tableEnv.listTables();
+// or tableEnv.executeSql("SHOW TABLES").print();
 
 // 把 “Orders” 的表名改为 “NewOrders”
-tableEnv.sqlUpdate("ALTER TABLE Orders RENAME TO NewOrders;");
+tableEnv.executeSql("ALTER TABLE Orders RENAME TO NewOrders;");
 
 // 字符串数组:["NewOrders"]
-String[] tables = tableEnv.listTable();
+String[] tables = tableEnv.listTables();
+// or tableEnv.executeSql("SHOW TABLES").print();
 {% endhighlight %}
 </div>
 
@@ -65,32 +67,36 @@
 val tableEnv = TableEnvironment.create(settings)
 
 // 注册名为 “Orders” 的表
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 
 // 字符串数组: ["Orders"]
-val tables = tableEnv.listTable()
+val tables = tableEnv.listTables()
+// or tableEnv.executeSql("SHOW TABLES").print()
 
 // 把 “Orders” 的表名改为 “NewOrders”
-tableEnv.sqlUpdate("ALTER TABLE Orders RENAME TO NewOrders;")
+tableEnv.executeSql("ALTER TABLE Orders RENAME TO NewOrders;")
 
 // 字符串数组:["NewOrders"]
-val tables = tableEnv.listTable()
+val tables = tableEnv.listTables()
+// or tableEnv.executeSql("SHOW TABLES").print()
 {% endhighlight %}
 </div>
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-settings = EnvironmentSettings.newInstance()...
-table_env = TableEnvironment.create(settings)
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
 
 # 字符串数组: ["Orders"]
-tables = tableEnv.listTable()
+tables = table_env.list_tables()
+# or table_env.execute_sql("SHOW TABLES").print()
 
 # 把 “Orders” 的表名改为 “NewOrders”
-tableEnv.sqlUpdate("ALTER TABLE Orders RENAME TO NewOrders;")
+table_env.execute_sql("ALTER TABLE Orders RENAME TO NewOrders;")
 
 # 字符串数组:["NewOrders"]
-tables = tableEnv.listTable()
+tables = table_env.list_tables()
+# or table_env.execute_sql("SHOW TABLES").print()
 {% endhighlight %}
 </div>
 
diff --git a/docs/dev/table/sql/create.md b/docs/dev/table/sql/create.md
index 00eb2ca..4bc398c 100644
--- a/docs/dev/table/sql/create.md
+++ b/docs/dev/table/sql/create.md
@@ -36,7 +36,7 @@
 
 ## Run a CREATE statement
 
-CREATE statements can be executed with the `sqlUpdate()` method of the `TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The `sqlUpdate()` method returns nothing for a successful CREATE operation, otherwise will throw an exception.
+CREATE statements can be executed with the `executeSql()` method of the `TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The `executeSql()` method returns 'OK' for a successful CREATE operation, otherwise will throw an exception.
 
 The following examples show how to run a CREATE statement in `TableEnvironment` and in SQL CLI.
 
@@ -48,16 +48,16 @@
 
 // SQL query with a registered table
 // register a table named "Orders"
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 // run a SQL query on the Table and retrieve the result as a new Table
 Table result = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 
-// SQL update with a registered table
+// Execute insert SQL with a registered table
 // register a TableSink
-tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)");
-// run a SQL update query on the Table and emit the result to the TableSink
-tableEnv.sqlUpdate(
+tableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)");
+// run an insert SQL on the Table and emit the result to the TableSink
+tableEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 {% endhighlight %}
 </div>
@@ -69,38 +69,38 @@
 
 // SQL query with a registered table
 // register a table named "Orders"
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 // run a SQL query on the Table and retrieve the result as a new Table
 val result = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 
-// SQL update with a registered table
+// Execute insert SQL with a registered table
 // register a TableSink
-tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH ('connector.path'='/path/to/file' ...)");
-// run a SQL update query on the Table and emit the result to the TableSink
-tableEnv.sqlUpdate(
+tableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH ('connector.path'='/path/to/file' ...)");
+// run an insert SQL on the Table and emit the result to the TableSink
+tableEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-settings = EnvironmentSettings.newInstance()...
-table_env = TableEnvironment.create(settings)
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
 
 # SQL query with a registered table
 # register a table named "Orders"
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 # run a SQL query on the Table and retrieve the result as a new Table
-result = tableEnv.sqlQuery(
+result = table_env.sql_query(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 
-# SQL update with a registered table
+# Execute an INSERT SQL with a registered table
 # register a TableSink
-table_env.sql_update("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
-# run a SQL update query on the Table and emit the result to the TableSink
+table_env.execute_sql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
+# run an INSERT SQL on the Table and emit the result to the TableSink
 table_env \
-    .sql_update("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+    .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
 
diff --git a/docs/dev/table/sql/create.zh.md b/docs/dev/table/sql/create.zh.md
index 34c9bc8..027debf 100644
--- a/docs/dev/table/sql/create.zh.md
+++ b/docs/dev/table/sql/create.zh.md
@@ -36,7 +36,7 @@
 
 ## 执行 CREATE 语句
 
-可以使用 `TableEnvironment` 中的 `sqlUpdate()` 方法执行 CREATE 语句,也可以在 [SQL CLI]({{ site.baseurl }}/zh/dev/table/sqlClient.html) 中执行 CREATE 语句。 若 CREATE 操作执行成功,`sqlUpdate()` 方法不返回任何内容,否则会抛出异常。
+可以使用 `TableEnvironment` 中的 `executeSql()` 方法执行 CREATE 语句,也可以在 [SQL CLI]({{ site.baseurl }}/zh/dev/table/sqlClient.html) 中执行 CREATE 语句。 若 CREATE 操作执行成功,`executeSql()` 方法返回 'OK',否则会抛出异常。
 
 以下的例子展示了如何在 `TableEnvironment` 和  SQL CLI 中执行一个 CREATE 语句。
 
@@ -46,18 +46,18 @@
 EnvironmentSettings settings = EnvironmentSettings.newInstance()...
 TableEnvironment tableEnv = TableEnvironment.create(settings);
 
-// 对已经已经注册的表进行 SQL 查询
+// 对已注册的表进行 SQL 查询
 // 注册名为 “Orders” 的表
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 // 在表上执行 SQL 查询,并把得到的结果作为一个新的表
 Table result = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 
-// SQL 对已注册的表进行 update 操作
+// 对已注册的表进行 INSERT 操作
 // 注册 TableSink
-tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)");
-// 在表上执行 SQL 更新查询并向 TableSink 发出结果
-tableEnv.sqlUpdate(
+tableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)");
+// 在表上执行 INSERT 语句并向 TableSink 发出结果
+tableEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 {% endhighlight %}
 </div>
@@ -67,40 +67,40 @@
 val settings = EnvironmentSettings.newInstance()...
 val tableEnv = TableEnvironment.create(settings)
 
-// 对已经已经注册的表进行 SQL 查询
+// 对已注册的表进行 SQL 查询
 // 注册名为 “Orders” 的表
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 // 在表上执行 SQL 查询,并把得到的结果作为一个新的表
 val result = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 
-// SQL 对已注册的表进行 update 操作
+// 对已注册的表进行 INSERT 操作
 // 注册 TableSink
-tableEnv.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH ('connector.path'='/path/to/file' ...)");
-// 在表上执行 SQL 更新查询并向 TableSink 发出结果
-tableEnv.sqlUpdate(
+tableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH ('connector.path'='/path/to/file' ...)");
+// 在表上执行 INSERT 语句并向 TableSink 发出结果
+tableEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-settings = EnvironmentSettings.newInstance()...
-table_env = TableEnvironment.create(settings)
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
 
-# 对已经已经注册的表进行 SQL 查询
+# 对已经注册的表进行 SQL 查询
 # 注册名为 “Orders” 的表
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 # 在表上执行 SQL 查询,并把得到的结果作为一个新的表
-result = tableEnv.sqlQuery(
+result = table_env.sql_query(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 
-# SQL 对已注册的表进行 update 操作
+# 对已注册的表进行 INSERT 操作
 # 注册 TableSink
-table_env.sql_update("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
-# 在表上执行 SQL 更新查询并向 TableSink 发出结果
+table_env.execute_sql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
+# 在表上执行 INSERT 语句并向 TableSink 发出结果
 table_env \
-    .sql_update("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+    .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
 
diff --git a/docs/dev/table/sql/drop.md b/docs/dev/table/sql/drop.md
index bcaea0b..c12105f 100644
--- a/docs/dev/table/sql/drop.md
+++ b/docs/dev/table/sql/drop.md
@@ -36,7 +36,7 @@
 
 ## Run a DROP statement
 
-DROP statements can be executed with the `sqlUpdate()` method of the `TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The `sqlUpdate()` method returns nothing for a successful DROP operation, otherwise will throw an exception.
+DROP statements can be executed with the `executeSql()` method of the `TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The `executeSql()` method returns 'OK' for a successful DROP operation, otherwise will throw an exception.
 
 The following examples show how to run a DROP statement in `TableEnvironment` and in SQL CLI.
 
@@ -47,16 +47,18 @@
 TableEnvironment tableEnv = TableEnvironment.create(settings);
 
 // register a table named "Orders"
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 
 // a string array: ["Orders"]
-String[] tables = tableEnv.listTable();
+String[] tables = tableEnv.listTables();
+// or tableEnv.executeSql("SHOW TABLES").print();
 
 // drop "Orders" table from catalog
-tableEnv.sqlUpdate("DROP TABLE Orders");
+tableEnv.executeSql("DROP TABLE Orders");
 
 // an empty string array
-String[] tables = tableEnv.listTable();
+String[] tables = tableEnv.listTables();
+// or tableEnv.executeSql("SHOW TABLES").print();
 {% endhighlight %}
 </div>
 
@@ -66,32 +68,36 @@
 val tableEnv = TableEnvironment.create(settings)
 
 // register a table named "Orders"
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
 
 // a string array: ["Orders"]
-val tables = tableEnv.listTable()
+val tables = tableEnv.listTables()
+// or tableEnv.executeSql("SHOW TABLES").print()
 
 // drop "Orders" table from catalog
-tableEnv.sqlUpdate("DROP TABLE Orders")
+tableEnv.executeSql("DROP TABLE Orders")
 
 // an empty string array
-val tables = tableEnv.listTable()
+val tables = tableEnv.listTables()
+// or tableEnv.executeSql("SHOW TABLES").print()
 {% endhighlight %}
 </div>
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-settings = EnvironmentSettings.newInstance()...
-table_env = TableEnvironment.create(settings)
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
 
 # a string array: ["Orders"]
-tables = tableEnv.listTable()
+tables = table_env.list_tables()
+# or table_env.execute_sql("SHOW TABLES").print()
 
 # drop "Orders" table from catalog
-tableEnv.sqlUpdate("DROP TABLE Orders")
+table_env.execute_sql("DROP TABLE Orders")
 
 # an empty string array
-tables = tableEnv.listTable()
+tables = table_env.list_tables()
+# or table_env.execute_sql("SHOW TABLES").print()
 {% endhighlight %}
 </div>
 
diff --git a/docs/dev/table/sql/drop.zh.md b/docs/dev/table/sql/drop.zh.md
index 8d1b76d..911ae97 100644
--- a/docs/dev/table/sql/drop.zh.md
+++ b/docs/dev/table/sql/drop.zh.md
@@ -36,7 +36,7 @@
 
 ## 执行 DROP 语句
 
-可以使用 `TableEnvironment` 中的 `sqlUpdate()` 方法执行 DROP 语句,也可以在 [SQL CLI]({{ site.baseurl }}/zh/dev/table/sqlClient.html) 中执行 DROP 语句。 若 DROP 操作执行成功,`sqlUpdate()` 方法不返回任何内容,否则会抛出异常。
+可以使用 `TableEnvironment` 中的 `executeSql()` 方法执行 DROP 语句,也可以在 [SQL CLI]({{ site.baseurl }}/zh/dev/table/sqlClient.html) 中执行 DROP 语句。 若 DROP 操作执行成功,`executeSql()` 方法返回 'OK',否则会抛出异常。
 
 以下的例子展示了如何在 `TableEnvironment` 和  SQL CLI 中执行一个 DROP 语句。
 
@@ -47,16 +47,18 @@
 TableEnvironment tableEnv = TableEnvironment.create(settings);
 
 // 注册名为 “Orders” 的表
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
 
 // 字符串数组: ["Orders"]
-String[] tables = tableEnv.listTable();
+String[] tables = tableEnv.listTables();
+// or tableEnv.executeSql("SHOW TABLES").print();
 
 // 从 catalog 删除 “Orders” 表
-tableEnv.sqlUpdate("DROP TABLE Orders");
+tableEnv.executeSql("DROP TABLE Orders");
 
 // 空字符串数组
-String[] tables = tableEnv.listTable();
+String[] tables = tableEnv.listTables();
+// or tableEnv.executeSql("SHOW TABLES").print();
 {% endhighlight %}
 </div>
 
@@ -66,32 +68,36 @@
 val tableEnv = TableEnvironment.create(settings)
 
 // 注册名为 “Orders” 的表
-tableEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
 
 // 字符串数组: ["Orders"]
-val tables = tableEnv.listTable()
+val tables = tableEnv.listTables()
+// or tableEnv.executeSql("SHOW TABLES").print()
 
 // 从 catalog 删除 “Orders” 表
-tableEnv.sqlUpdate("DROP TABLE Orders")
+tableEnv.executeSql("DROP TABLE Orders")
 
 // 空字符串数组
-val tables = tableEnv.listTable()
+val tables = tableEnv.listTables()
+// or tableEnv.executeSql("SHOW TABLES").print()
 {% endhighlight %}
 </div>
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-settings = EnvironmentSettings.newInstance()...
-table_env = TableEnvironment.create(settings)
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
 
 # 字符串数组: ["Orders"]
-tables = tableEnv.listTable()
+tables = table_env.listTables()
+# or table_env.executeSql("SHOW TABLES").print()
 
 # 从 catalog 删除 “Orders” 表
-tableEnv.sqlUpdate("DROP TABLE Orders")
+table_env.execute_sql("DROP TABLE Orders")
 
 # 空字符串数组
-tables = tableEnv.listTable()
+tables = table_env.list_tables()
+# or table_env.execute_sql("SHOW TABLES").print()
 {% endhighlight %}
 </div>
 
diff --git a/docs/dev/table/sql/index.zh.md b/docs/dev/table/sql/index.zh.md
index b93f4ff..70130e8 100644
--- a/docs/dev/table/sql/index.zh.md
+++ b/docs/dev/table/sql/index.zh.md
@@ -28,7 +28,7 @@
 
 本页面列出了目前 Flink SQL 所支持的所有语句:
 
-- [SELECT (查询)](queries.html)
+- [SELECT (Queries)](queries.html)
 - [CREATE TABLE, DATABASE, VIEW, FUNCTION](create.html)
 - [DROP TABLE, DATABASE, VIEW, FUNCTION](drop.html)
 - [ALTER TABLE, DATABASE, FUNCTION](alter.html)
diff --git a/docs/dev/table/sql/insert.md b/docs/dev/table/sql/insert.md
index 96052ad..01fa413 100644
--- a/docs/dev/table/sql/insert.md
+++ b/docs/dev/table/sql/insert.md
@@ -29,9 +29,10 @@
 
 ## Run an INSERT statement
 
-INSERT statements are specified with the `sqlUpdate()` method of the `TableEnvironment` or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The method `sqlUpdate()` for INSERT statements is a lazy execution, they will be executed only when `TableEnvironment.execute(jobName)` is invoked.
+Single INSERT statement can be executed through the `executeSql()` method of the `TableEnvironment`, or executed in [SQL CLI]({{ site.baseurl }}/dev/table/sqlClient.html). The `executeSql()` method for INSERT statement will submit a Flink job immediately, and return a `TableResult` instance which associates the submitted job. 
+Multiple INSERT statements can be executed through the `addInsertSql()` method of the `StatementSet` which can be created by the `TableEnvironment.createStatementSet()` method. The `addInsertSql()` method is a lazy execution, they will be executed only when `StatementSet.execute()` is invoked.
 
-The following examples show how to run an INSERT statement in `TableEnvironment` and in SQL CLI.
+The following examples show how to run a single INSERT statement in `TableEnvironment` and in SQL CLI, run multiple INSERT statements in `StatementSet`.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -40,12 +41,31 @@
 TableEnvironment tEnv = TableEnvironment.create(settings);
 
 // register a source table named "Orders" and a sink table named "RubberOrders"
-tEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");
-tEnv.sqlUpdate("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH (...)");
+tEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");
+tEnv.executeSql("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH (...)");
 
-// run a SQL update query on the registered source table and emit the result to registered sink table
-tEnv.sqlUpdate(
+// run a single INSERT query on the registered source table and emit the result to registered sink table
+TableResult tableResult1 = tEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+// get job status through TableResult
+System.out.println(tableResult1.getJobClient().get().getJobStatus());
+
+//----------------------------------------------------------------------------
+// register another sink table named "GlassOrders" for multiple INSERT queries
+tEnv.executeSql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)");
+
+// run multiple INSERT queries on the registered source table and emit the result to registered sink tables
+StatementSet stmtSet = tEnv.createStatementSet();
+// only single INSERT query can be accepted by `addInsertSql` method
+stmtSet.addInsertSql(
+  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+stmtSet.addInsertSql(
+  "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'");
+// execute all statements together
+TableResult tableResult2 = stmtSet.execute();
+// get job status through TableResult
+System.out.println(tableResult2.getJobClient().get().getJobStatus());
+
 {% endhighlight %}
 </div>
 
@@ -55,27 +75,65 @@
 val tEnv = TableEnvironment.create(settings)
 
 // register a source table named "Orders" and a sink table named "RubberOrders"
-tEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
-tEnv.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
+tEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
+tEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
 
-// run a SQL update query on the registered source table and emit the result to registered sink table
-tEnv.sqlUpdate(
+// run a single INSERT query on the registered source table and emit the result to registered sink table
+val tableResult1 = tEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+// get job status through TableResult
+println(tableResult1.getJobClient().get().getJobStatus())
+
+//----------------------------------------------------------------------------
+// register another sink table named "GlassOrders" for multiple INSERT queries
+tEnv.executeSql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)")
+
+// run multiple INSERT queries on the registered source table and emit the result to registered sink tables
+val stmtSet = tEnv.createStatementSet()
+// only single INSERT query can be accepted by `addInsertSql` method
+stmtSet.addInsertSql(
+  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+stmtSet.addInsertSql(
+  "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'")
+// execute all statements together
+val tableResult2 = stmtSet.execute()
+// get job status through TableResult
+println(tableResult2.getJobClient().get().getJobStatus())
+
 {% endhighlight %}
 </div>
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-settings = EnvironmentSettings.newInstance()...
-table_env = TableEnvironment.create(settings)
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
 
 # register a source table named "Orders" and a sink table named "RubberOrders"
-table_env.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
-table_env.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
+table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
+table_env.execute_sql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
 
-# run a SQL update query on the registered source table and emit the result to registered sink table
-table_env \
-    .sqlUpdate("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+# run a single INSERT query on the registered source table and emit the result to registered sink table
+table_result1 = table_env \
+    .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+# get job status through TableResult
+print(table_result1get_job_client().get_job_status())
+
+#----------------------------------------------------------------------------
+# register another sink table named "GlassOrders" for multiple INSERT queries
+table_env.execute_sql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)")
+
+# run multiple INSERT queries on the registered source table and emit the result to registered sink tables
+stmt_set = table_env.create_statement_set()
+# only single INSERT query can be accepted by `add_insert_sql` method
+stmt_set \
+    .add_insert_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+stmt_set \
+    .add_insert_sql("INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'")
+# execute all statements together
+table_result2 = stmt_set.execute()
+# get job status through TableResult
+print(table_result2.get_job_client().get_job_status())
+
 {% endhighlight %}
 </div>
 
diff --git a/docs/dev/table/sql/insert.zh.md b/docs/dev/table/sql/insert.zh.md
index 14ad9a4..863447c 100644
--- a/docs/dev/table/sql/insert.zh.md
+++ b/docs/dev/table/sql/insert.zh.md
@@ -29,9 +29,10 @@
 
 ## 执行 INSERT 语句
 
-可以使用 `TableEnvironment` 中的 `sqlUpdate()` 方法执行 INSERT 语句,也可以在 [SQL CLI]({{ site.baseurl }}/zh/dev/table/sqlClient.html) 中执行 INSERT 语句。`sqlUpdate()` 方法执行 INSERT 语句时时懒执行的,只有当`TableEnvironment.execute(jobName)`被调用时才会被执行。
+单条 INSERT 语句,可以使用 `TableEnvironment` 中的 `executeSql()` 方法执行,也可以在 [SQL CLI]({{ site.baseurl }}/zh/dev/table/sqlClient.html) 中执行 INSERT 语句。`executeSql()` 方法执行 INSERT 语句时会立即提交一个 Flink 作业,并且返回一个 TableResult 对象,通过该对象可以获取 JobClient 方便的操作提交的作业。
+多条 INSERT 语句,使用 `TableEnvironment` 中的 `createStatementSet` 创建一个 `StatementSet` 对象,然后使用 `StatementSet` 中的 `addInsertSql()` 方法添加多条 INSERT 语句,最后通过 `StatementSet` 中的 `execute()` 方法来执行。
 
-以下的例子展示了如何在 `TableEnvironment` 和  SQL CLI 中执行一个 INSERT 语句。
+以下的例子展示了如何在 `TableEnvironment` 和  SQL CLI 中执行一条 INSERT 语句,或者通过 `StatementSet` 执行多条 INSERT 语句。
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -40,12 +41,31 @@
 TableEnvironment tEnv = TableEnvironment.create(settings);
 
 // 注册一个 "Orders" 源表,和 "RubberOrders" 结果表
-tEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");
-tEnv.sqlUpdate("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH (...)");
+tEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product VARCHAR, amount INT) WITH (...)");
+tEnv.executeSql("CREATE TABLE RubberOrders(product VARCHAR, amount INT) WITH (...)");
 
-// 运行一个 INSERT 语句,将源表的数据输出到结果表中
-tEnv.sqlUpdate(
+// 运行一条 INSERT 语句,将源表的数据输出到结果表中
+TableResult tableResult1 = tEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+// 通过 TableResult 来获取作业状态
+System.out.println(tableResult1.getJobClient().get().getJobStatus());
+
+//----------------------------------------------------------------------------
+// 注册一个 "GlassOrders" 结果表用于运行多 INSERT 语句
+tEnv.executeSql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)");
+
+// 运行多条 INSERT 语句,将原表数据输出到多个结果表中
+StatementSet stmtSet = tEnv.createStatementSet();
+// `addInsertSql` 方法每次只接收单条 INSERT 语句
+stmtSet.addInsertSql(
+  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
+stmtSet.addInsertSql(
+  "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'");
+// 执行刚刚添加的所有 INSERT 语句
+TableResult tableResult2 = stmtSet.execute();
+// 通过 TableResult 来获取作业状态
+System.out.println(tableResult1.getJobClient().get().getJobStatus());
+
 {% endhighlight %}
 </div>
 
@@ -55,27 +75,66 @@
 val tEnv = TableEnvironment.create(settings)
 
 // 注册一个 "Orders" 源表,和 "RubberOrders" 结果表
-tEnv.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
-tEnv.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
+tEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
+tEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
 
 // 运行一个 INSERT 语句,将源表的数据输出到结果表中
-tEnv.sqlUpdate(
+val tableResult1 = tEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+// 通过 TableResult 来获取作业状态
+println(tableResult1.getJobClient().get().getJobStatus())
+
+//----------------------------------------------------------------------------
+// 注册一个 "GlassOrders" 结果表用于运行多 INSERT 语句
+tEnv.executeSql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)");
+
+// 运行多个 INSERT 语句,将原表数据输出到多个结果表中
+val stmtSet = tEnv.createStatementSet()
+// `addInsertSql` 方法每次只接收单条 INSERT 语句
+stmtSet.addInsertSql(
+  "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+stmtSet.addInsertSql(
+  "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'")
+// 执行刚刚添加的所有 INSERT 语句
+val tableResult2 = stmtSet.execute()
+// 通过 TableResult 来获取作业状态
+println(tableResult1.getJobClient().get().getJobStatus())
+  
 {% endhighlight %}
 </div>
 
 <div data-lang="python" markdown="1">
 {% highlight python %}
-settings = EnvironmentSettings.newInstance()...
-table_env = TableEnvironment.create(settings)
+settings = EnvironmentSettings.new_instance()...
+table_env = StreamTableEnvironment.create(env, settings)
 
 # 注册一个 "Orders" 源表,和 "RubberOrders" 结果表
-table_env.sqlUpdate("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
-table_env.sqlUpdate("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
+table_env.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
+table_env.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)")
 
-# 运行一个 INSERT 语句,将源表的数据输出到结果表中
-table_env \
-    .sqlUpdate("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+# 运行一条 INSERT 语句,将源表的数据输出到结果表中
+table_result1 = table_env \
+    .executeSql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+# 通过 TableResult 来获取作业状态
+print(table_result1.get_job_client().get_job_status())
+
+#----------------------------------------------------------------------------
+# 注册一个 "GlassOrders" 结果表用于运行多 INSERT 语句
+table_env.execute_sql("CREATE TABLE GlassOrders(product VARCHAR, amount INT) WITH (...)")
+
+# 运行多条 INSERT 语句,将原表数据输出到多个结果表中
+stmt_set = table_env.create_statement_set()
+# `add_insert_sql` 方法每次只接收单条 INSERT 语句
+stmt_set \
+    .add_insert_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+stmt_set \
+    .add_insert_sql("INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'")
+# 执行刚刚添加的所有 INSERT 语句
+table_result2 = stmt_set.execute()
+# 通过 TableResult 来获取作业状态
+print(table_result2.get_job_client().get_job_status())
+
+
 {% endhighlight %}
 </div>
 
diff --git a/docs/dev/table/sql/queries.md b/docs/dev/table/sql/queries.md
index add05e8..b667c29 100644
--- a/docs/dev/table/sql/queries.md
+++ b/docs/dev/table/sql/queries.md
@@ -25,7 +25,7 @@
 * This will be replaced by the TOC
 {:toc}
 
-SELECT queries are specified with the `sqlQuery()` method of the `TableEnvironment`. The method returns the result of the SELECT query as a `Table`. A `Table` can be used in [subsequent SQL and Table API queries]({{ site.baseurl }}/dev/table/common.html#mixing-table-api-and-sql), be [converted into a DataSet or DataStream]({{ site.baseurl }}/dev/table/common.html#integration-with-datastream-and-dataset-api), or [written to a TableSink]({{ site.baseurl }}/dev/table/common.html#emit-a-table). SQL and Table API queries can be seamlessly mixed and are holistically optimized and translated into a single program.
+SELECT statements and VALUES statements are specified with the `sqlQuery()` method of the `TableEnvironment`. The method returns the result of the SELECT statement (or the VALUES statements) as a `Table`. A `Table` can be used in [subsequent SQL and Table API queries]({{ site.baseurl }}/dev/table/common.html#mixing-table-api-and-sql), be [converted into a DataSet or DataStream]({{ site.baseurl }}/dev/table/common.html#integration-with-datastream-and-dataset-api), or [written to a TableSink]({{ site.baseurl }}/dev/table/common.html#emit-a-table). SQL and Table API queries can be seamlessly mixed and are holistically optimized and translated into a single program.
 
 In order to access a table in a SQL query, it must be [registered in the TableEnvironment]({{ site.baseurl }}/dev/table/common.html#register-tables-in-the-catalog). A table can be registered from a [TableSource]({{ site.baseurl }}/dev/table/common.html#register-a-tablesource), [Table]({{ site.baseurl }}/dev/table/common.html#register-a-table), [CREATE TABLE statement](#create-table), [DataStream, or DataSet]({{ site.baseurl }}/dev/table/common.html#register-a-datastream-or-dataset-as-table). Alternatively, users can also [register catalogs in a TableEnvironment]({{ site.baseurl }}/dev/table/catalogs.html) to specify the location of the data sources.
 
@@ -58,7 +58,6 @@
 Table result2 = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 
-// SQL update with a registered table
 // create and register a TableSink
 final Schema schema = new Schema()
     .field("product", DataTypes.STRING())
@@ -69,8 +68,8 @@
     .withSchema(schema)
     .createTemporaryTable("RubberOrders");
 
-// run a SQL update query on the Table and emit the result to the TableSink
-tableEnv.sqlUpdate(
+// run an INSERT SQL on the Table and emit the result to the TableSink
+tableEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 {% endhighlight %}
 </div>
@@ -95,7 +94,6 @@
 val result2 = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 
-// SQL update with a registered table
 // create and register a TableSink
 val schema = new Schema()
     .field("product", DataTypes.STRING())
@@ -106,8 +104,8 @@
     .withSchema(schema)
     .createTemporaryTable("RubberOrders")
 
-// run a SQL update query on the Table and emit the result to the TableSink
-tableEnv.sqlUpdate(
+// run an INSERT SQL on the Table and emit the result to the TableSink
+tableEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
@@ -123,7 +121,6 @@
 result = table_env \
     .sql_query("SELECT SUM(amount) FROM %s WHERE product LIKE '%%Rubber%%'" % table)
 
-# SQL update with a registered table
 # create and register a TableSink
 t_env.connect(FileSystem().path("/path/to/file")))
     .with_format(Csv()
@@ -134,16 +131,107 @@
                  .field("amount", DataTypes.BIGINT()))
     .create_temporary_table("RubberOrders")
 
-# run a SQL update query on the Table and emit the result to the TableSink
+# run an INSERT SQL on the Table and emit the result to the TableSink
 table_env \
-    .sql_update("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+    .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
 </div>
 
 {% top %}
 
-## Supported Syntax
+## Execute a Query
+A SELECT statement or a VALUES statement can be executed to collect the content to local through the `TableEnvironment.executeSql()` method. The method returns the result of the SELECT statement (or the VALUES statement) as a `TableResult`. Similar to a SELECT statement, a `Table` object can be executed using the `Table.execute()` method to collect the content of the query to the local client.
+`TableResult.collect()` method returns a closeable row iterator. The select job will not be finished unless all result data has been collected. We should actively close the job to avoid resource leak through the `CloseableIterator#close()` method. 
+We can also print the select result to client console through the `TableResult.print()` method. The result data in `TableResult` can be accessed only once. Thus, `collect()` and `print()` must not be called after each other.
+
+For streaming job, `TableResult.collect()` method or `TableResult.print` method guarantee end-to-end exactly-once record delivery. This requires the checkpointing mechanism to be enabled. By default, checkpointing is disabled. To enable checkpointing, we can set checkpointing properties (see the <a href="{{ site.baseurl }}/ops/config.html#checkpointing">checkpointing config</a> for details) through `TableConfig`.
+So a result record can be accessed by client only after its corresponding checkpoint completes.
+
+**Notes:** For streaming mode, only append-only query is supported now. 
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+// enable checkpointing
+tableEnv.getConfig().getConfiguration().set(
+  ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
+tableEnv.getConfig().getConfiguration().set(
+  ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10));
+
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+
+// execute SELECT statement
+TableResult tableResult1 = tableEnv.executeSql("SELECT * FROM Orders");
+// use try-with-resources statement to make sure the iterator will be closed automatically
+try (CloseableIterator<Row> it = tableResult1.collect()) {
+    while(it.hasNext()) {
+        Row row = it.next();
+        // handle row
+    }
+}
+
+// execute Table
+TableResult tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute();
+tableResult2.print();
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+val tableEnv = StreamTableEnvironment.create(env, settings)
+// enable checkpointing
+tableEnv.getConfig.getConfiguration.set(
+  ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
+tableEnv.getConfig.getConfiguration.set(
+  ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10))
+
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
+
+// execute SELECT statement
+val tableResult1 = tableEnv.executeSql("SELECT * FROM Orders")
+val it = tableResult1.collect()
+try while (it.hasNext) {
+  val row = it.next
+  // handle row
+}
+finally it.close() // close the iterator to avoid resource leak
+
+// execute Table
+val tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute()
+tableResult2.print()
+
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env, settings)
+# enable checkpointing
+table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE")
+table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "10s")
+
+table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
+
+# execute SELECT statement
+table_result1 = table_env.execute_sql("SELECT * FROM Orders")
+table_result1.print()
+
+# execute Table
+table_result2 = table_env.sql_query("SELECT * FROM Orders").execute()
+table_result2.print()
+
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
+
+## Syntax
 
 Flink parses SQL using [Apache Calcite](https://calcite.apache.org/docs/reference.html), which supports standard ANSI SQL.
 
diff --git a/docs/dev/table/sql/queries.zh.md b/docs/dev/table/sql/queries.zh.md
index e4e5d69..62efaca 100644
--- a/docs/dev/table/sql/queries.zh.md
+++ b/docs/dev/table/sql/queries.zh.md
@@ -25,11 +25,11 @@
 * This will be replaced by the TOC
 {:toc}
 
-SELECT 查询需要使用 `TableEnvironment` 的 `sqlQuery()` 方法加以指定。这个方法会以 `Table` 的形式返回 SELECT 的查询结果。 `Table` 可以被用于 [随后的SQL 与 Table API 查询]({{ site.baseurl }}/zh/dev/table/common.html#mixing-table-api-and-sql) 、 [转换为 DataSet 或 DataStream ]({{ site.baseurl }}/zh/dev/table/common.html#integration-with-datastream-and-dataset-api)或 [输出到 TableSink ]({{ site.baseurl }}/zh/dev/table/common.html#emit-a-table)。SQL 与 Table API 的查询可以进行无缝融合、整体优化并翻译为单一的程序。
+SELECT 语句和 VALUES 语句需要使用 `TableEnvironment` 的 `sqlQuery()` 方法加以指定。这个方法会以 `Table` 的形式返回 SELECT (或 VALUE)的查询结果。`Table` 可以被用于 [随后的SQL 与 Table API 查询]({{ site.baseurl }}/zh/dev/table/common.html#mixing-table-api-and-sql) 、 [转换为 DataSet 或 DataStream ]({{ site.baseurl }}/zh/dev/table/common.html#integration-with-datastream-and-dataset-api)或 [输出到 TableSink ]({{ site.baseurl }}/zh/dev/table/common.html#emit-a-table)。SQL 与 Table API 的查询可以进行无缝融合、整体优化并翻译为单一的程序。
 
 为了可以在 SQL 查询中访问到表,你需要先 [在 TableEnvironment 中注册表 ]({{ site.baseurl }}/zh/dev/table/common.html#register-tables-in-the-catalog)。表可以通过 [TableSource]({{ site.baseurl }}/zh/dev/table/common.html#register-a-tablesource)、 [Table]({{ site.baseurl }}/zh/dev/table/common.html#register-a-table)、[CREATE TABLE 语句](create.html)、 [DataStream 或 DataSet]({{ site.baseurl }}/zh/dev/table/common.html#register-a-datastream-or-dataset-as-table) 注册。 用户也可以通过 [向 TableEnvironment 中注册 catalog ]({{ site.baseurl }}/zh/dev/table/catalogs.html) 的方式指定数据源的位置。
 
-为方便起见 `Table.toString()` 将会在其 `TableEnvironment` 中自动使用一个唯一的名字注册表并返回表名。 因此, `Table` 对象可以如下文所示样例,直接内联到 SQL 查询中。
+为方便起见 `Table.toString()` 将会在其 `TableEnvironment` 中自动使用一个唯一的名字注册表并返回表名。 因此, `Table` 对象可以如下文所示样例,直接内联到 SQL 语句中。
 
 **注意:** 查询若包括了不支持的 SQL 特性,将会抛出 `TableException`。批处理和流处理所支持的 SQL 特性将会在下述章节中列出。
 
@@ -58,7 +58,6 @@
 Table result2 = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 
-// SQL 更新一个已经注册的表
 // 创建并注册一个 TableSink
 final Schema schema = new Schema()
     .field("product", DataTypes.STRING())
@@ -69,8 +68,8 @@
     .withSchema(schema)
     .createTemporaryTable("RubberOrders");
 
-// 在表上执行更新语句并把结果发出到 TableSink
-tableEnv.sqlUpdate(
+// 在表上执行插入语句并把结果发出到 TableSink
+tableEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
 {% endhighlight %}
 </div>
@@ -88,14 +87,12 @@
 val result = tableEnv.sqlQuery(
   s"SELECT SUM(amount) FROM $table WHERE product LIKE '%Rubber%'")
 
-// SQL 查询一个已经注册的表
 // 使用名称 "Orders" 注册一个 DataStream 
 tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount")
 // 在表上执行 SQL 查询并得到以新表返回的结果
 val result2 = tableEnv.sqlQuery(
   "SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 
-// 使用 SQL 更新一个已经注册的表
 // 创建并注册一个 TableSink
 val schema = new Schema()
     .field("product", DataTypes.STRING())
@@ -106,8 +103,8 @@
     .withSchema(schema)
     .createTemporaryTable("RubberOrders")
 
-// 在表上执行 SQL 更新操作,并把结果发出到 TableSink
-tableEnv.sqlUpdate(
+// 在表上执行插入操作,并把结果发出到 TableSink
+tableEnv.executeSql(
   "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
@@ -123,7 +120,6 @@
 result = table_env \
     .sql_query("SELECT SUM(amount) FROM %s WHERE product LIKE '%%Rubber%%'" % table)
 
-# SQL 更新已经注册的表
 # 创建并注册 TableSink
 t_env.connect(FileSystem().path("/path/to/file")))
     .with_format(Csv()
@@ -134,16 +130,106 @@
                  .field("amount", DataTypes.BIGINT()))
     .create_temporary_table("RubberOrders")
 
-# 在表上执行 SQL 更新操作,并把结果发出到 TableSink
+# 在表上执行插入操作,并把结果发出到 TableSink
 table_env \
-    .sql_update("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
+    .execute_sql("INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'")
 {% endhighlight %}
 </div>
 </div>
 
 {% top %}
 
-## 支持的语法
+## 执行查询
+SELECT 语句或者 VALUES 语句可以通过 `TableEnvironment.executeSql()` 方法来执行,将选择的结果收集到本地。该方法返回 `TableResult` 对象用于包装查询的结果。和 SELECT 语句很像,一个 `Table` 对象可以通过 `Table.execute()` 方法执行从而将 `Table` 的内容收集到本地客户端。
+`TableResult.collect()` 方法返回一个可以关闭的行迭代器。除非所有的数据都被收集到本地,否则一个查询作业永远不会结束。所以我们应该通过 `CloseableIterator#close()` 方法主动地关闭作业以防止资源泄露。
+我们还可以通过 `TableResult.print()` 方法将查询结果打印到本地控制台。`TableResult` 中的结果数据只能被访问一次,因此一个 `TableResult` 实例中,`collect()` 方法和 `print()` 方法不能被同时使用。
+
+对于流模式,`TableResult.collect()` 方法或者 `TableResult.print` 方法保证端到端精确一次的数据交付。这就要求开启 checkpointing。默认情况下 checkpointing 是禁止的,我们可以通过 `TableConfig` 设置 checkpointing 相关属性(请参考 <a href="{{ site.baseurl }}/zh/ops/config.html#checkpointing">checkpointing 配置</a>)来开启 checkpointing。
+因此一条结果数据只有在其对应的 checkpointing 完成后才能在客户端被访问。
+
+**注意:** 对于流模式,当前仅支持追加模式的查询语句,并且应该开启 checkpoint。因为一条结果只有在其对应的 checkpoint 完成之后才能被客户端访问到。
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+// enable checkpointing
+tableEnv.getConfig().getConfiguration().set(
+  ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
+tableEnv.getConfig().getConfiguration().set(
+  ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10));
+
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
+
+// execute SELECT statement
+TableResult tableResult1 = tableEnv.executeSql("SELECT * FROM Orders");
+// use try-with-resources statement to make sure the iterator will be closed automatically
+try (CloseableIterator<Row> it = tableResult1.collect()) {
+    while(it.hasNext()) {
+        Row row = it.next();
+        // handle row
+    }
+}
+
+// execute Table
+TableResult tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute();
+tableResult2.print();
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+val tableEnv = StreamTableEnvironment.create(env, settings)
+// enable checkpointing
+tableEnv.getConfig.getConfiguration.set(
+  ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE)
+tableEnv.getConfig.getConfiguration.set(
+  ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofSeconds(10))
+
+tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
+
+// execute SELECT statement
+val tableResult1 = tableEnv.executeSql("SELECT * FROM Orders")
+val it = tableResult1.collect()
+try while (it.hasNext) {
+  val row = it.next
+  // handle row
+}
+finally it.close() // close the iterator to avoid resource leak
+
+// execute Table
+val tableResult2 = tableEnv.sqlQuery("SELECT * FROM Orders").execute()
+tableResult2.print()
+
+{% endhighlight %}
+</div>
+<div data-lang="python" markdown="1">
+{% highlight python %}
+env = StreamExecutionEnvironment.get_execution_environment()
+table_env = StreamTableEnvironment.create(env, settings)
+# enable checkpointing
+table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE")
+table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "10s")
+
+table_env.execute_sql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)")
+
+# execute SELECT statement
+table_result1 = table_env.execute_sql("SELECT * FROM Orders")
+table_result1.print()
+
+# execute Table
+table_result2 = table_env.sql_query("SELECT * FROM Orders").execute()
+table_result2.print()
+
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
+## 语法
 
 Flink 通过支持标准 ANSI SQL的 [Apache Calcite](https://calcite.apache.org/docs/reference.html) 解析 SQL。
 
diff --git a/docs/dev/table/streaming/query_configuration.md b/docs/dev/table/streaming/query_configuration.md
index bf84843..b677276 100644
--- a/docs/dev/table/streaming/query_configuration.md
+++ b/docs/dev/table/streaming/query_configuration.md
@@ -51,7 +51,7 @@
   sink);                       // table sink
 
 // emit result Table via a TableSink
-result.insertInto("outputTable");
+result.executeInsert("outputTable");
 
 // convert result Table into a DataStream<Row>
 DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class);
@@ -82,7 +82,7 @@
   sink)                           // table sink
 
 // emit result Table via a TableSink
-result.insertInto("outputTable")
+result.executeInsert("outputTable")
 
 // convert result Table into a DataStream[Row]
 val stream: DataStream[Row] = result.toAppendStream[Row]
@@ -110,7 +110,7 @@
                               sink)  # table sink
 
 # emit result Table via a TableSink
-result.insert_into("outputTable")
+result.execute_insert("outputTable")
 
 {% endhighlight %}
 </div>
diff --git a/docs/dev/table/streaming/query_configuration.zh.md b/docs/dev/table/streaming/query_configuration.zh.md
index bf84843..2fe7ab2 100644
--- a/docs/dev/table/streaming/query_configuration.zh.md
+++ b/docs/dev/table/streaming/query_configuration.zh.md
@@ -51,7 +51,7 @@
   sink);                       // table sink
 
 // emit result Table via a TableSink
-result.insertInto("outputTable");
+result.executeInsert("outputTable");
 
 // convert result Table into a DataStream<Row>
 DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class);
@@ -82,7 +82,7 @@
   sink)                           // table sink
 
 // emit result Table via a TableSink
-result.insertInto("outputTable")
+result.executeInsert("outputTable")
 
 // convert result Table into a DataStream[Row]
 val stream: DataStream[Row] = result.toAppendStream[Row]
diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index a4e1f5c..fb46087 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -2060,13 +2060,13 @@
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
-        <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.</p>
+        <p>Similar to the `INSERT INTO` clause in a SQL query, the method performs an insertion into a registered output table. The `executeInsert()` method will immediately submit a Flink job which execute the insert operation.</p>
 
         <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#connector-tables">Connector tables</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
 
 {% highlight java %}
 Table orders = tableEnv.from("Orders");
-orders.insertInto("OutOrders");
+orders.executeInsert("OutOrders");
 {% endhighlight %}
       </td>
     </tr>
@@ -2090,13 +2090,13 @@
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
-        <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.</p>
+        <p>Similar to the `INSERT INTO` clause in a SQL query, the method performs an insertion into a registered output table. The `executeInsert()` method will immediately submit a Flink job which execute the insert operation.</p>
 
         <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#connector-tables">Connector tables</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
 
 {% highlight scala %}
 val orders: Table = tableEnv.from("Orders")
-orders.insertInto("OutOrders")
+orders.executeInsert("OutOrders")
 {% endhighlight %}
       </td>
     </tr>
@@ -2120,13 +2120,13 @@
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
-        <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.</p>
+        <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table. The executeInsert method will immediately submit a flink job which execute the insert operation.</p>
 
         <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#register-a-tablesink">Register a TableSink</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
 
 {% highlight python %}
-orders = table_env.from_path("Orders");
-orders.insert_into("OutOrders");
+orders = table_env.from_path("Orders")
+orders.execute_insert("OutOrders")
 {% endhighlight %}
       </td>
     </tr>
diff --git a/docs/dev/table/tableApi.zh.md b/docs/dev/table/tableApi.zh.md
index 0ac1d80..f22ec9c 100644
--- a/docs/dev/table/tableApi.zh.md
+++ b/docs/dev/table/tableApi.zh.md
@@ -2059,13 +2059,13 @@
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
-        <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.</p>
+        <p>Similar to the `INSERT INTO` clause in a SQL query, the method performs an insertion into a registered output table. The `executeInsert()` method will immediately submit a Flink job which execute the insert operation.</p>
 
         <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#register-a-tablesink">Register a TableSink</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
 
 {% highlight java %}
 Table orders = tableEnv.from("Orders");
-orders.insertInto("OutOrders");
+orders.executeInsert("OutOrders");
 {% endhighlight %}
       </td>
     </tr>
@@ -2089,13 +2089,13 @@
         <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span>
       </td>
       <td>
-        <p>Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.</p>
+        <p>Similar to the `INSERT INTO` clause in a SQL query, the method performs an insertion into a registered output table. The `executeInsert()` method will immediately submit a Flink job which execute the insert operation.</p>
 
         <p>Output tables must be registered in the TableEnvironment (see <a href="common.html#connector-tables">Connector tables</a>). Moreover, the schema of the registered table must match the schema of the query.</p>
 
 {% highlight scala %}
 val orders: Table = tableEnv.from("Orders")
-orders.insertInto("OutOrders")
+orders.executeInsert("OutOrders")
 {% endhighlight %}
       </td>
     </tr>
@@ -2119,13 +2119,13 @@
         <span class="label label-primary">批处理</span> <span class="label label-primary">流处理</span>
       </td>
       <td>
-        <p>类似于SQL请求中的INSERT INTO子句。将数据输出到一个已注册的输出表中。</p>
+        <p>类似于SQL请求中的INSERT INTO子句。将数据输出到一个已注册的输出表中。`execute_insert` 方法会立即提交一个 Flink 作业,触发插入操作。</p>
 
         <p>输出表必须先在TableEnvironment中注册(详见<a href="common.html#register-a-tablesink">注册一个TableSink</a>)。此外,注册的表的模式(schema)必须和请求的结果的模式(schema)相匹配。</p>
 
 {% highlight python %}
-orders = table_env.from_path("Orders");
-orders.insert_into("OutOrders");
+orders = table_env.from_path("Orders")
+orders.execute_insert("OutOrders")
 {% endhighlight %}
       </td>
     </tr>