[FLINK-30595] support create table like
This closes #467
diff --git a/docs/content/docs/how-to/creating-tables.md b/docs/content/docs/how-to/creating-tables.md
index 8e3a656..eba5d91 100644
--- a/docs/content/docs/how-to/creating-tables.md
+++ b/docs/content/docs/how-to/creating-tables.md
@@ -112,6 +112,30 @@
{{< /hint >}}
+### Create Table Like
+
+{{< tabs "create-table-like" >}}
+
+{{< tab "Flink" >}}
+
+To create a table with the same schema, partition, and table properties as another table, use CREATE TABLE LIKE.
+
+```sql
+CREATE TABLE MyTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ behavior STRING,
+ dt STRING,
+ hh STRING,
+ PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
+) ;
+
+CREATE TABLE MyTableLike LIKE MyTable;
+```
+
+{{< /tab >}}
+
+
### Table Properties
Users can specify table properties to enable features or improve performance of Table Store. For a complete list of such properties, see [configurations]({{< ref "docs/maintenance/configurations" >}}).
diff --git a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
index 253714d..0309191 100644
--- a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
+++ b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.store.connector;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.core.fs.Path;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
@@ -203,10 +202,6 @@
// remove table path
String specific = options.remove(PATH.key());
if (specific != null) {
- if (!catalog.getTableLocation(tablePath).equals(new Path(specific))) {
- throw new IllegalArgumentException(
- "Illegal table path in table options: " + specific);
- }
catalogTable = catalogTable.copy(options);
}
diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java
index f2f652d..7516fe3 100644
--- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java
+++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java
@@ -123,4 +123,14 @@
"+I[3, 1, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"}]]",
"+I[4, 1, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"}]]");
}
+
+ @Test
+ public void testCreateTableLike() throws Exception {
+ sql("CREATE TABLE T (a INT)");
+ sql("CREATE TABLE T1 LIKE T");
+ List<Row> result = sql("SELECT * FROM T1$schemas s");
+ System.out.println(result);
+ assertThat(result.toString())
+ .isEqualTo("[+I[0, [{\"id\":0,\"name\":\"a\",\"type\":\"INT\"}], [], [], {}, ]]");
+ }
}