[FLINK-31551] Add support for CrateDB (#29)

diff --git a/docs/content.zh/docs/connectors/table/jdbc.md b/docs/content.zh/docs/connectors/table/jdbc.md
index c9d35d2..063e0a3 100644
--- a/docs/content.zh/docs/connectors/table/jdbc.md
+++ b/docs/content.zh/docs/connectors/table/jdbc.md
@@ -48,13 +48,14 @@
 
 在连接到具体数据库时,也需要对应的驱动依赖,目前支持的驱动如下:
 
-| Driver      |      Group Id      |      Artifact Id       |      JAR         |
-| :-----------| :------------------| :----------------------| :----------------|
-| MySQL       |       `mysql`      | `mysql-connector-java` | [下载](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
-| Oracle      | `com.oracle.database.jdbc` |        `ojdbc8`        | [下载](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8)
-| PostgreSQL  |  `org.postgresql`  |      `postgresql`      | [下载](https://jdbc.postgresql.org/download/) |
-| Derby       | `org.apache.derby` |        `derby`         | [下载](http://db.apache.org/derby/derby_downloads.html) | |
-| SQL Server | `com.microsoft.sqlserver` |        `mssql-jdbc`         | [下载](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) |
+| Driver     | Group Id                   | Artifact Id            |      JAR         |
+|:-----------|:---------------------------|:-----------------------| :----------------|
+| MySQL      | `mysql`                    | `mysql-connector-java` | [下载](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
+| Oracle     | `com.oracle.database.jdbc` | `ojdbc8`               | [下载](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8)
+| PostgreSQL | `org.postgresql`           | `postgresql`           | [下载](https://jdbc.postgresql.org/download/) |
+| Derby      | `org.apache.derby`         | `derby`                | [下载](http://db.apache.org/derby/derby_downloads.html) | |
+| SQL Server | `com.microsoft.sqlserver`  | `mssql-jdbc`           | [下载](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) |
+| CrateDB    | `io.crate`                 | `crate-jdbc`           | [Download](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) |
 
 当前,JDBC 连接器和驱动不在 Flink 二进制发布包中,请参阅[这里]({{< ref "docs/dev/configuration/overview" >}})了解在集群上执行时何连接它们。
 
@@ -609,7 +610,7 @@
 
 数据类型映射
 ----------------
-Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、Oracle、PostgreSQL、Derby 等。其中,Derby 通常是用于测试目的。下表列出了从关系数据库数据类型到 Flink SQL 数据类型的类型映射,映射表可以使得在 Flink 中定义 JDBC 表更加简单。
+Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、Oracle、PostgreSQL、CrateDB, Derby 等。其中,Derby 通常是用于测试目的。下表列出了从关系数据库数据类型到 Flink SQL 数据类型的类型映射,映射表可以使得在 Flink 中定义 JDBC 表更加简单。
 
 <table class="table table-bordered">
     <thead>
@@ -617,6 +618,7 @@
         <th class="text-left"><a href="https://dev.mysql.com/doc/refman/8.0/en/data-types.html">MySQL type</a></th>
         <th class="text-left"><a href="https://docs.oracle.com/database/121/SQLRF/sql_elements001.htm#SQLRF30020">Oracle type</a></th>
         <th class="text-left"><a href="https://www.postgresql.org/docs/12/datatype.html">PostgreSQL type</a></th>
+        <th class="text-left"><a href="https://crate.io/docs/crate/reference/en/master/general/ddl/data-types.html">CrateDB type</a></th>
         <th class="text-left"><a href="https://docs.microsoft.com/en-us/sql/t-sql/data-types/data-types-transact-sql?view=sql-server-ver16">SQL Server type</a></th>
         <th class="text-left"><a href="{{< ref "docs/dev/table/types" >}}">Flink SQL type</a></th>
       </tr>
@@ -626,6 +628,7 @@
       <td><code>TINYINT</code></td>
       <td></td>
       <td></td>
+      <td></td>
       <td><code>TINYINT</code></td>
       <td><code>TINYINT</code></td>
     </tr>
@@ -639,6 +642,9 @@
         <code>INT2</code><br>
         <code>SMALLSERIAL</code><br>
         <code>SERIAL2</code></td>
+      <td>
+        <code>SMALLINT</code>
+        <code>SHORT</code></td>
       <td><code>SMALLINT</code></td>
       <td><code>SMALLINT</code></td>
     </tr>
@@ -651,6 +657,9 @@
       <td>
         <code>INTEGER</code><br>
         <code>SERIAL</code></td>
+      <td>
+        <code>INTEGER</code><br>
+        <code>INT</code></td>
       <td><code>INT</code></td>
       <td><code>INT</code></td>
     </tr>
@@ -662,6 +671,9 @@
       <td>
         <code>BIGINT</code><br>
         <code>BIGSERIAL</code></td>
+      <td>
+        <code>BIGINT</code><br>
+        <code>LONG</code></td>
       <td><code>BIGINT</code></td>
       <td><code>BIGINT</code></td>
     </tr>
@@ -670,21 +682,19 @@
       <td></td>
       <td></td>
       <td></td>
+      <td></td> 
       <td><code>DECIMAL(20, 0)</code></td>
     </tr>
     <tr>
-      <td><code>BIGINT</code></td>
-      <td></td>
-      <td><code>BIGINT</code></td>
-      <td><code>BIGINT</code></td>
-    </tr>
-    <tr>
       <td><code>FLOAT</code></td>
       <td>
         <code>BINARY_FLOAT</code></td>
       <td>
         <code>REAL</code><br>
         <code>FLOAT4</code></td>
+      <td>
+        <code>REAL</code><br>
+        <code>FLOAT</code></td>
       <td><code>REAL</code></td>
       <td><code>FLOAT</code></td>
     </tr>
@@ -696,6 +706,9 @@
       <td>
         <code>FLOAT8</code><br>
         <code>DOUBLE PRECISION</code></td>
+      <td>
+        <code>DOUBLE</code><br>
+        <code>DOUBLE PRECISION</code></td>
       <td><code>FLOAT</code></td>
       <td><code>DOUBLE</code></td>
     </tr>
@@ -712,6 +725,7 @@
       <td>
         <code>NUMERIC(p, s)</code><br>
         <code>DECIMAL(p, s)</code></td>
+      <td><code>NUMERIC(p, s)</code></td>
       <td><code>DECIMAL(p, s)</code></td>
       <td><code>DECIMAL(p, s)</code></td>
     </tr>
@@ -721,6 +735,7 @@
         <code>TINYINT(1)</code></td>
       <td></td>
       <td><code>BOOLEAN</code></td>
+      <td><code>BOOLEAN</code></td>
       <td><code>BIT</code></td>
       <td><code>BOOLEAN</code></td>
     </tr>
@@ -728,6 +743,7 @@
       <td><code>DATE</code></td>
       <td><code>DATE</code></td>
       <td><code>DATE</code></td>
+      <td><code>DATE</code> (only in expressions - not stored type)</td>
       <td><code>DATE</code></td>
       <td><code>DATE</code></td>
     </tr>
@@ -735,6 +751,7 @@
       <td><code>TIME [(p)]</code></td>
       <td><code>DATE</code></td>
       <td><code>TIME [(p)] [WITHOUT TIMEZONE]</code></td>
+      <td><code>TIME</code> (only in expressions - not stored type)</td>
       <td><code>TIME(0)</code></td>
       <td><code>TIME [(p)] [WITHOUT TIMEZONE]</code></td>
     </tr>
@@ -742,6 +759,7 @@
       <td><code>DATETIME [(p)]</code></td>
       <td><code>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</code></td>
       <td><code>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</code></td>
+      <td><code>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</code></td>
       <td>
         <code>DATETIME</code>
         <code>DATETIME2</code>
@@ -765,6 +783,13 @@
         <code>TEXT</code></td>
       <td>
         <code>CHAR(n)</code><br>
+        <code>CHARACTER(n)</code><br>
+        <code>VARCHAR(n)</code><br>
+        <code>CHARACTER VARYING(n)</code><br>
+        <code>TEXT</code>
+        <code>STRING</code></td>
+      <td>
+        <code>CHAR(n)</code><br>
         <code>NCHAR(n)</code><br>
         <code>VARCHAR(n)</code><br>
         <code>NVARCHAR(n)</code><br>
@@ -781,6 +806,7 @@
         <code>RAW(s)</code><br>
         <code>BLOB</code></td>
       <td><code>BYTEA</code></td>
+      <td></td> 
       <td>
         <code>BINARY(n)</code><br>
         <code>VARBINARY(n)</code><br></td>
@@ -790,6 +816,7 @@
       <td></td>
       <td></td>
       <td><code>ARRAY</code></td>
+      <td><code>ARRAY</code></td> 
       <td></td>
       <td><code>ARRAY</code></td>
     </tr>
diff --git a/docs/content/docs/connectors/table/jdbc.md b/docs/content/docs/connectors/table/jdbc.md
index 25bc205..0c5f6e9 100644
--- a/docs/content/docs/connectors/table/jdbc.md
+++ b/docs/content/docs/connectors/table/jdbc.md
@@ -45,13 +45,14 @@
 
 A driver dependency is also required to connect to a specified database. Here are drivers currently supported:
 
-| Driver     |      Group Id      |      Artifact Id       |      JAR         |
-|:-----------| :------------------| :----------------------| :----------------|
-| MySQL      |       `mysql`      | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
-| Oracle     | `com.oracle.database.jdbc` |        `ojdbc8`        | [Download](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) |
-| PostgreSQL |  `org.postgresql`  |      `postgresql`      | [Download](https://jdbc.postgresql.org/download/) |
-| Derby      | `org.apache.derby` |        `derby`         | [Download](http://db.apache.org/derby/derby_downloads.html) |
-| SQL Server | `com.microsoft.sqlserver` |        `mssql-jdbc`         | [Download](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) |
+| Driver     | Group Id                   | Artifact Id            |      JAR         |
+|:-----------|:---------------------------|:-----------------------| :----------------|
+| MySQL      | `mysql`                    | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
+| Oracle     | `com.oracle.database.jdbc` | `ojdbc8`               | [Download](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) |
+| PostgreSQL | `org.postgresql`           | `postgresql`           | [Download](https://jdbc.postgresql.org/download/) |
+| Derby      | `org.apache.derby`         | `derby`                | [Download](http://db.apache.org/derby/derby_downloads.html) |
+| SQL Server | `com.microsoft.sqlserver`  | `mssql-jdbc`           | [Download](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) |
+| CrateDB    | `io.crate`                 | `crate-jdbc`           | [Download](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) |
 
 
 JDBC connector and drivers are not part of Flink's binary distribution. See how to link with them for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
@@ -602,10 +603,42 @@
 SELECT * FROM mysql_catalog.given_database.test_table2;
 SELECT * FROM given_database.test_table2;
 ```
+### JDBC Catalog for CrateDB
+
+#### CrateDB Metaspace Mapping
+
+CrateDB is similar to PostgreSQL, but it has only on database which defaults to `crate`. It has an additional namespace as `schema`, a CrateDB instance can have multiple schemas with a default one named "doc", each schema can have multiple tables.
+In Flink, when querying tables registered by CrateDB catalog, users can use either `schema_name.table_name` or just `table_name`. The `schema_name` is optional and defaults to `doc`.
+
+Therefore, the metaspace mapping between Flink Catalog and CrateDB is as following:
+
+| Flink Catalog Metaspace Structure    | CrateDB Metaspace Structure    |
+| :------------------------------------|:-------------------------------|
+| catalog name (defined in Flink only) | N/A                            |
+| database name                        | database name (always `crate`) |
+| table name                           | [schema_name.]table_name       |
+
+The full path of CrateDB table in Flink should be ``"<catalog>.<db>.`<schema.table>`"`` if schema is specified, note the `<schema.table>` should be escaped.
+
+Here are some examples to access CrateDB tables:
+
+```sql
+-- scan table 'test_table' of 'doc' schema (i.e. the default schema), the schema name can be omitted
+SELECT * FROM mycatalog.crate.doc.test_table;
+SELECT * FROM crate.doc.test_table;
+SELECT * FROM doc.test_table;
+SELECT * FROM test_table;
+
+-- scan table 'test_table2' of 'custom_schema' schema,
+-- the custom schema can not be omitted and must be escaped with table.
+SELECT * FROM mycatalog.crate.`custom_schema.test_table2`
+SELECT * FROM crate.`custom_schema.test_table2`;
+SELECT * FROM `custom_schema.test_table2`;
+```
 
 Data Type Mapping
 ----------------
-Flink supports connect to several databases which uses dialect like MySQL, Oracle, PostgreSQL, Derby. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily.
+Flink supports connect to several databases which uses dialect like MySQL, Oracle, PostgreSQL, CrateDB, Derby. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily.
 
 <table class="table table-bordered">
     <thead>
@@ -613,6 +646,7 @@
         <th class="text-left"><a href="https://dev.mysql.com/doc/refman/8.0/en/data-types.html">MySQL type</a></th>
         <th class="text-left"><a href="https://docs.oracle.com/database/121/SQLRF/sql_elements001.htm#SQLRF30020">Oracle type</a></th>
         <th class="text-left"><a href="https://www.postgresql.org/docs/12/datatype.html">PostgreSQL type</a></th>
+        <th class="text-left"><a href="https://crate.io/docs/crate/reference/en/master/general/ddl/data-types.html">CrateDB type</a></th>
         <th class="text-left"><a href="https://docs.microsoft.com/en-us/sql/t-sql/data-types/data-types-transact-sql?view=sql-server-ver16">SQL Server type</a></th>
         <th class="text-left"><a href="{{< ref "docs/dev/table/types" >}}">Flink SQL type</a></th>
       </tr>
@@ -622,6 +656,7 @@
       <td><code>TINYINT</code></td>
       <td></td>
       <td></td>
+      <td></td>
       <td><code>TINYINT</code></td>
       <td><code>TINYINT</code></td>
     </tr>
@@ -635,6 +670,9 @@
         <code>INT2</code><br>
         <code>SMALLSERIAL</code><br>
         <code>SERIAL2</code></td>
+      <td>
+        <code>SMALLINT</code>
+        <code>SHORT</code></td>
       <td><code>SMALLINT</code></td>
       <td><code>SMALLINT</code></td>
     </tr>
@@ -647,6 +685,9 @@
       <td>
         <code>INTEGER</code><br>
         <code>SERIAL</code></td>
+      <td>
+        <code>INTEGER</code><br>
+        <code>INT</code></td>
       <td><code>INT</code></td>
       <td><code>INT</code></td>
     </tr>
@@ -658,6 +699,9 @@
       <td>
         <code>BIGINT</code><br>
         <code>BIGSERIAL</code></td>
+      <td>
+        <code>BIGINT</code><br>
+        <code>LONG</code></td>
       <td><code>BIGINT</code></td>
       <td><code>BIGINT</code></td>
     </tr>
@@ -666,6 +710,7 @@
       <td></td>
       <td></td>
       <td></td>
+      <td></td> 
       <td><code>DECIMAL(20, 0)</code></td>
     </tr>
     <tr>
@@ -675,6 +720,9 @@
       <td>
         <code>REAL</code><br>
         <code>FLOAT4</code></td>
+      <td>
+        <code>REAL</code><br>
+        <code>FLOAT</code></td>
       <td><code>REAL</code></td>
       <td><code>FLOAT</code></td>
     </tr>
@@ -686,6 +734,9 @@
       <td>
         <code>FLOAT8</code><br>
         <code>DOUBLE PRECISION</code></td>
+      <td>
+        <code>DOUBLE</code><br>
+        <code>DOUBLE PRECISION</code></td>
       <td><code>FLOAT</code></td>
       <td><code>DOUBLE</code></td>
     </tr>
@@ -702,6 +753,7 @@
       <td>
         <code>NUMERIC(p, s)</code><br>
         <code>DECIMAL(p, s)</code></td>
+      <td><code>NUMERIC(p, s)</code></td>
       <td><code>DECIMAL(p, s)</code></td>
       <td><code>DECIMAL(p, s)</code></td>
     </tr>
@@ -711,6 +763,7 @@
         <code>TINYINT(1)</code></td>
       <td></td>
       <td><code>BOOLEAN</code></td>
+      <td><code>BOOLEAN</code></td>
       <td><code>BIT</code></td>
       <td><code>BOOLEAN</code></td>
     </tr>
@@ -718,6 +771,7 @@
       <td><code>DATE</code></td>
       <td><code>DATE</code></td>
       <td><code>DATE</code></td>
+      <td><code>DATE</code> (only in expressions - not stored type)</td>
       <td><code>DATE</code></td>
       <td><code>DATE</code></td>
     </tr>
@@ -725,6 +779,7 @@
       <td><code>TIME [(p)]</code></td>
       <td><code>DATE</code></td>
       <td><code>TIME [(p)] [WITHOUT TIMEZONE]</code></td>
+      <td><code>TIME</code> (only in expressions - not stored type)</td>
       <td><code>TIME(0)</code></td>
       <td><code>TIME [(p)] [WITHOUT TIMEZONE]</code></td>
     </tr>
@@ -732,6 +787,7 @@
       <td><code>DATETIME [(p)]</code></td>
       <td><code>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</code></td>
       <td><code>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</code></td>
+      <td><code>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</code></td>
       <td>
         <code>DATETIME</code>
         <code>DATETIME2</code>
@@ -755,6 +811,13 @@
         <code>TEXT</code></td>
       <td>
         <code>CHAR(n)</code><br>
+        <code>CHARACTER(n)</code><br>
+        <code>VARCHAR(n)</code><br>
+        <code>CHARACTER VARYING(n)</code><br>
+        <code>TEXT</code>
+        <code>STRING</code></td>
+      <td>
+        <code>CHAR(n)</code><br>
         <code>NCHAR(n)</code><br>
         <code>VARCHAR(n)</code><br>
         <code>NVARCHAR(n)</code><br>
@@ -771,6 +834,7 @@
         <code>RAW(s)</code><br>
         <code>BLOB</code></td>
       <td><code>BYTEA</code></td>
+      <td></td> 
       <td>
         <code>BINARY(n)</code><br>
         <code>VARBINARY(n)</code><br></td>
@@ -780,6 +844,7 @@
       <td></td>
       <td></td>
       <td><code>ARRAY</code></td>
+      <td><code>ARRAY</code></td> 
       <td></td>
       <td><code>ARRAY</code></td>
     </tr>
diff --git a/flink-connector-jdbc/pom.xml b/flink-connector-jdbc/pom.xml
index 2f2a662..0a298e8 100644
--- a/flink-connector-jdbc/pom.xml
+++ b/flink-connector-jdbc/pom.xml
@@ -55,7 +55,6 @@
 		</dependency>
 
 		<!-- Postgres -->
-
 		<dependency>
 			<groupId>org.postgresql</groupId>
 			<artifactId>postgresql</artifactId>
@@ -79,6 +78,20 @@
 			<scope>provided</scope>
 		</dependency>
 
+		<!-- CrateDB -->
+		<dependency>
+			<groupId>io.crate</groupId>
+			<artifactId>crate-jdbc</artifactId>
+			<version>2.7.0</version>
+			<scope>provided</scope>
+			<exclusions>
+				<exclusion>
+					<groupId>net.java.dev.jna</groupId>
+					<artifactId>jna</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+
 		<!-- Tests -->
 
 		<dependency>
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
index a5c6c1e..84ac0ab 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.connector.jdbc.catalog;
 
+import org.apache.flink.connector.jdbc.databases.cratedb.catalog.CrateDBCatalog;
+import org.apache.flink.connector.jdbc.databases.cratedb.dialect.CrateDBDialect;
 import org.apache.flink.connector.jdbc.databases.mysql.catalog.MySqlCatalog;
 import org.apache.flink.connector.jdbc.databases.mysql.dialect.MySqlDialect;
 import org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresCatalog;
@@ -52,6 +54,9 @@
         if (dialect instanceof PostgresDialect) {
             return new PostgresCatalog(
                     userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
+        } else if (dialect instanceof CrateDBDialect) {
+            return new CrateDBCatalog(
+                    userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
         } else if (dialect instanceof MySqlDialect) {
             return new MySqlCatalog(
                     userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractPostgresCompatibleRowConverter.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractPostgresCompatibleRowConverter.java
new file mode 100644
index 0000000..86c6f16
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractPostgresCompatibleRowConverter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.converter;
+
+import org.apache.flink.table.data.GenericArrayData;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
+
+import java.lang.reflect.Array;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink internal object for
+ * PostgreSQL compatible databases.
+ */
+public abstract class AbstractPostgresCompatibleRowConverter<T extends java.sql.Array>
+        extends AbstractJdbcRowConverter {
+
+    private static final long serialVersionUID = 1L;
+
+    protected AbstractPostgresCompatibleRowConverter(RowType rowType) {
+        super(rowType);
+    }
+
+    @Override
+    public JdbcDeserializationConverter createInternalConverter(LogicalType type) {
+        LogicalTypeRoot root = type.getTypeRoot();
+
+        if (root == LogicalTypeRoot.ARRAY) {
+            ArrayType arrayType = (ArrayType) type;
+            return createPostgresArrayConverter(arrayType);
+        } else {
+            return createPrimitiveConverter(type);
+        }
+    }
+
+    @Override
+    protected JdbcSerializationConverter createNullableExternalConverter(LogicalType type) {
+        LogicalTypeRoot root = type.getTypeRoot();
+        if (root == LogicalTypeRoot.ARRAY) {
+            // note:Writing ARRAY type is not yet supported by PostgreSQL dialect now.
+            return (val, index, statement) -> {
+                throw new IllegalStateException(
+                        String.format(
+                                "Writing ARRAY type is not yet supported in JDBC:%s.",
+                                converterName()));
+            };
+        } else {
+            return super.createNullableExternalConverter(type);
+        }
+    }
+
+    private JdbcDeserializationConverter createPostgresArrayConverter(ArrayType arrayType) {
+        // Since PGJDBC 42.2.15 (https://github.com/pgjdbc/pgjdbc/pull/1194) bytea[] is wrapped in
+        // primitive byte arrays
+        final Class<?> elementClass =
+                LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
+        final JdbcDeserializationConverter elementConverter =
+                createNullableInternalConverter(arrayType.getElementType());
+        return val -> {
+            @SuppressWarnings("unchecked")
+            T pgArray = (T) val;
+            Object[] in = (Object[]) pgArray.getArray();
+            final Object[] array = (Object[]) Array.newInstance(elementClass, in.length);
+            for (int i = 0; i < in.length; i++) {
+                array[i] = elementConverter.deserialize(in[i]);
+            }
+            return new GenericArrayData(array);
+        };
+    }
+
+    // Have its own method so that Postgres can support primitives that super class doesn't support
+    // in the future
+    private JdbcDeserializationConverter createPrimitiveConverter(LogicalType type) {
+        return super.createInternalConverter(type);
+    }
+}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalog.java
new file mode 100644
index 0000000..8eb0927
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalog.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresCatalog;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.commons.compress.utils.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** Catalog for CrateDB. */
+@Internal
+public class CrateDBCatalog extends PostgresCatalog {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CrateDBCatalog.class);
+
+    public static final String DEFAULT_DATABASE = "crate";
+
+    private static final Set<String> builtinSchemas =
+            new HashSet<String>() {
+                {
+                    add("pg_catalog");
+                    add("information_schema");
+                    add("sys");
+                }
+            };
+
+    public CrateDBCatalog(
+            ClassLoader userClassLoader,
+            String catalogName,
+            String defaultDatabase,
+            String username,
+            String pwd,
+            String baseUrl) {
+        super(
+                userClassLoader,
+                catalogName,
+                defaultDatabase,
+                username,
+                pwd,
+                baseUrl,
+                new CrateDBTypeMapper());
+    }
+
+    // ------ databases ------
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        return ImmutableList.of(DEFAULT_DATABASE);
+    }
+
+    // ------ schemas ------
+
+    protected Set<String> getBuiltinSchemas() {
+        return builtinSchemas;
+    }
+
+    // ------ tables ------
+
+    @Override
+    protected List<String> getPureTables(Connection conn, List<String> schemas)
+            throws SQLException {
+        List<String> tables = Lists.newArrayList();
+
+        // position 1 is database name, position 2 is schema name, position 3 is table name
+        try (PreparedStatement ps =
+                conn.prepareStatement(
+                        "SELECT table_name FROM information_schema.tables "
+                                + "WHERE table_schema = ? "
+                                + "ORDER BY table_type, table_name")) {
+            for (String schema : schemas) {
+                // Column index 1 is database name, 2 is schema name, 3 is table name
+                extractColumnValuesByStatement(ps, 1, null, schema).stream()
+                        .map(pureTable -> schema + "." + pureTable)
+                        .forEach(tables::add);
+            }
+            return tables;
+        }
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+        List<String> tables;
+        try {
+            tables = listTables(tablePath.getDatabaseName());
+        } catch (DatabaseNotExistException e) {
+            return false;
+        }
+
+        String searchPath =
+                extractColumnValuesBySQL(baseUrl + DEFAULT_DATABASE, "show search_path", 1, null)
+                        .get(0);
+        String[] schemas = searchPath.split("\\s*,\\s*");
+
+        if (tables.contains(getSchemaTableName(tablePath))) {
+            return true;
+        }
+        for (String schema : schemas) {
+            if (tables.contains(schema + "." + tablePath.getObjectName())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @Override
+    protected String getTableName(ObjectPath tablePath) {
+        return CrateDBTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgTableName();
+    }
+
+    @Override
+    protected String getSchemaName(ObjectPath tablePath) {
+        return CrateDBTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgSchemaName();
+    }
+
+    @Override
+    protected String getSchemaTableName(ObjectPath tablePath) {
+        return CrateDBTablePath.fromFlinkTableName(tablePath.getObjectName()).getFullPath();
+    }
+}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTablePath.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTablePath.java
new file mode 100644
index 0000000..db6a995
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTablePath.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.catalog;
+
+import org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresTablePath;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Table path of CrateDB in Flink. Can be of formats "table_name" or "schema_name.table_name". When
+ * it's "table_name", the schema name defaults to "doc".
+ */
+public class CrateDBTablePath extends PostgresTablePath {
+
+    private static final String DEFAULT_CRATE_SCHEMA_NAME = "doc";
+
+    public CrateDBTablePath(String pgSchemaName, String pgTableName) {
+        super(pgSchemaName, pgTableName);
+    }
+
+    public static CrateDBTablePath fromFlinkTableName(String flinkTableName) {
+        if (flinkTableName.contains(".")) {
+            String[] path = flinkTableName.split("\\.");
+
+            checkArgument(
+                    path.length == 2,
+                    String.format(
+                            "Table name '%s' is not valid. The parsed length is %d",
+                            flinkTableName, path.length));
+
+            return new CrateDBTablePath(path[0], path[1]);
+        } else {
+            return new CrateDBTablePath(getDefaultSchemaName(), flinkTableName);
+        }
+    }
+
+    public static String toFlinkTableName(String schema, String table) {
+        return new PostgresTablePath(schema, table).getFullPath();
+    }
+
+    protected static String getDefaultSchemaName() {
+        return DEFAULT_CRATE_SCHEMA_NAME;
+    }
+}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTypeMapper.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTypeMapper.java
new file mode 100644
index 0000000..3532a1e
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTypeMapper.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.databases.postgres.catalog.PostgresTypeMapper;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** CrateDBTypeMapper util class. */
+@Internal
+public class CrateDBTypeMapper extends PostgresTypeMapper {
+
+    private static final Logger LOG = LoggerFactory.getLogger(CrateDBTypeMapper.class);
+
+    // CrateDB jdbc driver uses very similar mapping
+    // to PostgreSQL driver, and adds some extras:
+    private static final String PG_STRING = "string";
+    private static final String PG_STRING_ARRAY = "_string";
+
+    @Override
+    protected DataType getMapping(String pgType, int precision, int scale) {
+        switch (pgType) {
+            case PG_SERIAL:
+            case PG_BIGSERIAL:
+                return null;
+            case PG_STRING:
+                return DataTypes.STRING();
+            case PG_STRING_ARRAY:
+                return DataTypes.ARRAY(DataTypes.STRING());
+            default:
+                return super.getMapping(pgType, precision, scale);
+        }
+    }
+
+    @Override
+    protected String getDBType() {
+        return "CrateDB";
+    }
+}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialect.java
new file mode 100644
index 0000000..7592cf2
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialect.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.dialect;
+
+import org.apache.flink.connector.jdbc.dialect.AbstractPostgresCompatibleDialect;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Optional;
+
+/** JDBC dialect for CrateDB. */
+public class CrateDBDialect extends AbstractPostgresCompatibleDialect {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public String dialectName() {
+        return "CrateDB";
+    }
+
+    @Override
+    public CrateDBRowConverter getRowConverter(RowType rowType) {
+        return new CrateDBRowConverter(rowType);
+    }
+
+    @Override
+    public Optional<String> defaultDriverName() {
+        return Optional.of("io.crate.client.jdbc.CrateDriver");
+    }
+}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialectFactory.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialectFactory.java
new file mode 100644
index 0000000..ac5a8e7
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBDialectFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.dialect;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+
+/** Factory for {@link CrateDBDialect}. */
+@Internal
+public class CrateDBDialectFactory implements JdbcDialectFactory {
+    @Override
+    public boolean acceptsURL(String url) {
+        return url.startsWith("jdbc:crate:");
+    }
+
+    @Override
+    public JdbcDialect create() {
+        return new CrateDBDialect();
+    }
+}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBRowConverter.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBRowConverter.java
new file mode 100644
index 0000000..1412b0d
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/cratedb/dialect/CrateDBRowConverter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.dialect;
+
+import org.apache.flink.connector.jdbc.converter.AbstractPostgresCompatibleRowConverter;
+import org.apache.flink.table.types.logical.RowType;
+
+import io.crate.shade.org.postgresql.jdbc.PgArray;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink internal object for
+ * CrateDB.
+ */
+public class CrateDBRowConverter extends AbstractPostgresCompatibleRowConverter<PgArray> {
+
+    private static final long serialVersionUID = 1L;
+
+    public CrateDBRowConverter(RowType rowType) {
+        super(rowType);
+    }
+
+    @Override
+    public String converterName() {
+        return "CrateDB";
+    }
+}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java
index 61f43ba..7ece9b6 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresCatalog.java
@@ -70,7 +70,7 @@
                 }
             };
 
-    private final JdbcDialectTypeMapper dialectTypeMapper;
+    protected final JdbcDialectTypeMapper dialectTypeMapper;
 
     public PostgresCatalog(
             ClassLoader userClassLoader,
@@ -79,8 +79,26 @@
             String username,
             String pwd,
             String baseUrl) {
+        this(
+                userClassLoader,
+                catalogName,
+                defaultDatabase,
+                username,
+                pwd,
+                baseUrl,
+                new PostgresTypeMapper());
+    }
+
+    protected PostgresCatalog(
+            ClassLoader userClassLoader,
+            String catalogName,
+            String defaultDatabase,
+            String username,
+            String pwd,
+            String baseUrl,
+            JdbcDialectTypeMapper dialectTypeMapper) {
         super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
-        this.dialectTypeMapper = new PostgresTypeMapper();
+        this.dialectTypeMapper = dialectTypeMapper;
     }
 
     // ------ databases ------
@@ -95,8 +113,35 @@
                 dbName -> !builtinDatabases.contains(dbName));
     }
 
+    // ------ schemas ------
+
+    protected Set<String> getBuiltinSchemas() {
+        return builtinSchemas;
+    }
+
     // ------ tables ------
 
+    protected List<String> getPureTables(Connection conn, List<String> schemas)
+            throws SQLException {
+        List<String> tables = Lists.newArrayList();
+
+        // position 1 is database name, position 2 is schema name, position 3 is table name
+        try (PreparedStatement ps =
+                conn.prepareStatement(
+                        "SELECT * FROM information_schema.tables "
+                                + "WHERE table_type = 'BASE TABLE' "
+                                + "AND table_schema = ? "
+                                + "ORDER BY table_type, table_name;")) {
+            for (String schema : schemas) {
+                // Column index 1 is database name, 2 is schema name, 3 is table name
+                extractColumnValuesByStatement(ps, 3, null, schema).stream()
+                        .map(pureTable -> schema + "." + pureTable)
+                        .forEach(tables::add);
+            }
+            return tables;
+        }
+    }
+
     @Override
     public List<String> listTables(String databaseName)
             throws DatabaseNotExistException, CatalogException {
@@ -107,8 +152,6 @@
             throw new DatabaseNotExistException(getName(), databaseName);
         }
 
-        List<String> tables = Lists.newArrayList();
-
         final String url = baseUrl + databaseName;
         try (Connection conn = DriverManager.getConnection(url, username, pwd)) {
             // get all schemas
@@ -117,28 +160,15 @@
                     conn.prepareStatement("SELECT schema_name FROM information_schema.schemata;")) {
                 schemas =
                         extractColumnValuesByStatement(
-                                ps, 1, pgSchema -> !builtinSchemas.contains(pgSchema));
+                                ps, 1, pgSchema -> !getBuiltinSchemas().contains(pgSchema));
             }
 
             // get all tables
-            try (PreparedStatement ps =
-                    conn.prepareStatement(
-                            "SELECT * FROM information_schema.tables "
-                                    + "WHERE table_type = 'BASE TABLE' "
-                                    + "AND table_schema = ? "
-                                    + "ORDER BY table_type, table_name;")) {
-                for (String schema : schemas) {
-                    // Column index 1 is database name, 2 is schema name, 3 is table name
-                    extractColumnValuesByStatement(ps, 3, null, schema).stream()
-                            .map(pureTable -> schema + "." + pureTable)
-                            .forEach(tables::add);
-                }
-            }
+            return getPureTables(conn, schemas);
         } catch (Exception e) {
             throw new CatalogException(
                     String.format("Failed to list tables for database %s", databaseName), e);
         }
-        return tables;
     }
 
     /**
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTablePath.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTablePath.java
index 199a081..d811e72 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTablePath.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTablePath.java
@@ -38,8 +38,12 @@
     private final String pgTableName;
 
     public PostgresTablePath(String pgSchemaName, String pgTableName) {
-        checkArgument(!StringUtils.isNullOrWhitespaceOnly(pgSchemaName));
-        checkArgument(!StringUtils.isNullOrWhitespaceOnly(pgTableName));
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(pgSchemaName),
+                "Schema name is not valid. Null or empty is not allowed");
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(pgTableName),
+                "Table name is not valid. Null or empty is not allowed");
 
         this.pgSchemaName = pgSchemaName;
         this.pgTableName = pgTableName;
@@ -57,7 +61,7 @@
 
             return new PostgresTablePath(path[0], path[1]);
         } else {
-            return new PostgresTablePath(DEFAULT_POSTGRES_SCHEMA_NAME, flinkTableName);
+            return new PostgresTablePath(getDefaultSchemaName(), flinkTableName);
         }
     }
 
@@ -77,6 +81,10 @@
         return pgSchemaName;
     }
 
+    protected static String getDefaultSchemaName() {
+        return DEFAULT_POSTGRES_SCHEMA_NAME;
+    }
+
     @Override
     public String toString() {
         return getFullPath();
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTypeMapper.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTypeMapper.java
index b2769d7..f213128 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTypeMapper.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTypeMapper.java
@@ -50,8 +50,8 @@
     // boolean <=> bool
     // decimal <=> numeric
     private static final String PG_SMALLSERIAL = "smallserial";
-    private static final String PG_SERIAL = "serial";
-    private static final String PG_BIGSERIAL = "bigserial";
+    protected static final String PG_SERIAL = "serial";
+    protected static final String PG_BIGSERIAL = "bigserial";
     private static final String PG_BYTEA = "bytea";
     private static final String PG_BYTEA_ARRAY = "_bytea";
     private static final String PG_SMALLINT = "int2";
@@ -93,6 +93,15 @@
         int precision = metadata.getPrecision(colIndex);
         int scale = metadata.getScale(colIndex);
 
+        DataType dataType = getMapping(pgType, precision, scale);
+        if (dataType == null) {
+            throw new UnsupportedOperationException(
+                    String.format("Doesn't support %s type '%s' yet", getDBType(), pgType));
+        }
+        return dataType;
+    }
+
+    protected DataType getMapping(String pgType, int precision, int scale) {
         switch (pgType) {
             case PG_BOOLEAN:
                 return DataTypes.BOOLEAN();
@@ -128,14 +137,13 @@
             case PG_NUMERIC:
                 // see SPARK-26538: handle numeric without explicit precision and scale.
                 if (precision > 0) {
-                    return DataTypes.DECIMAL(precision, metadata.getScale(colIndex));
+                    return DataTypes.DECIMAL(precision, scale);
                 }
                 return DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18);
             case PG_NUMERIC_ARRAY:
                 // see SPARK-26538: handle numeric without explicit precision and scale.
                 if (precision > 0) {
-                    return DataTypes.ARRAY(
-                            DataTypes.DECIMAL(precision, metadata.getScale(colIndex)));
+                    return DataTypes.ARRAY(DataTypes.DECIMAL(precision, scale));
                 }
                 return DataTypes.ARRAY(DataTypes.DECIMAL(DecimalType.MAX_PRECISION, 18));
             case PG_CHAR:
@@ -169,8 +177,11 @@
             case PG_DATE_ARRAY:
                 return DataTypes.ARRAY(DataTypes.DATE());
             default:
-                throw new UnsupportedOperationException(
-                        String.format("Doesn't support Postgres type '%s' yet", pgType));
+                return null;
         }
     }
+
+    protected String getDBType() {
+        return "Postgres";
+    }
 }
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java
index f5b4af2..d0924ae 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresDialect.java
@@ -19,114 +19,29 @@
 package org.apache.flink.connector.jdbc.databases.postgres.dialect;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
-import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.connector.jdbc.dialect.AbstractPostgresCompatibleDialect;
 import org.apache.flink.table.types.logical.RowType;
 
-import java.util.Arrays;
-import java.util.EnumSet;
 import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
 
-/** JDBC dialect for PostgresSQL. */
+/** JDBC dialect for PostgreSQL. */
 @Internal
-public class PostgresDialect extends AbstractDialect {
+public class PostgresDialect extends AbstractPostgresCompatibleDialect {
 
     private static final long serialVersionUID = 1L;
 
-    // Define MAX/MIN precision of TIMESTAMP type according to PostgreSQL docs:
-    // https://www.postgresql.org/docs/12/datatype-datetime.html
-    private static final int MAX_TIMESTAMP_PRECISION = 6;
-    private static final int MIN_TIMESTAMP_PRECISION = 1;
-
-    // Define MAX/MIN precision of DECIMAL type according to PostgreSQL docs:
-    // https://www.postgresql.org/docs/12/datatype-numeric.html#DATATYPE-NUMERIC-DECIMAL
-    private static final int MAX_DECIMAL_PRECISION = 1000;
-    private static final int MIN_DECIMAL_PRECISION = 1;
-
     @Override
-    public JdbcRowConverter getRowConverter(RowType rowType) {
+    public PostgresRowConverter getRowConverter(RowType rowType) {
         return new PostgresRowConverter(rowType);
     }
 
     @Override
-    public String getLimitClause(long limit) {
-        return "LIMIT " + limit;
-    }
-
-    @Override
     public Optional<String> defaultDriverName() {
         return Optional.of("org.postgresql.Driver");
     }
 
-    /** Postgres upsert query. It use ON CONFLICT ... DO UPDATE SET.. to replace into Postgres. */
-    @Override
-    public Optional<String> getUpsertStatement(
-            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
-        String uniqueColumns =
-                Arrays.stream(uniqueKeyFields)
-                        .map(this::quoteIdentifier)
-                        .collect(Collectors.joining(", "));
-        String updateClause =
-                Arrays.stream(fieldNames)
-                        .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
-                        .collect(Collectors.joining(", "));
-        return Optional.of(
-                getInsertIntoStatement(tableName, fieldNames)
-                        + " ON CONFLICT ("
-                        + uniqueColumns
-                        + ")"
-                        + " DO UPDATE SET "
-                        + updateClause);
-    }
-
-    @Override
-    public String quoteIdentifier(String identifier) {
-        return identifier;
-    }
-
     @Override
     public String dialectName() {
         return "PostgreSQL";
     }
-
-    @Override
-    public Optional<Range> decimalPrecisionRange() {
-        return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION));
-    }
-
-    @Override
-    public Optional<Range> timestampPrecisionRange() {
-        return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
-    }
-
-    @Override
-    public Set<LogicalTypeRoot> supportedTypes() {
-        // The data types used in PostgreSQL are list at:
-        // https://www.postgresql.org/docs/12/datatype.html
-
-        // TODO: We can't convert BINARY data type to
-        //  PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
-        // LegacyTypeInfoDataTypeConverter.
-
-        return EnumSet.of(
-                LogicalTypeRoot.CHAR,
-                LogicalTypeRoot.VARCHAR,
-                LogicalTypeRoot.BOOLEAN,
-                LogicalTypeRoot.VARBINARY,
-                LogicalTypeRoot.DECIMAL,
-                LogicalTypeRoot.TINYINT,
-                LogicalTypeRoot.SMALLINT,
-                LogicalTypeRoot.INTEGER,
-                LogicalTypeRoot.BIGINT,
-                LogicalTypeRoot.FLOAT,
-                LogicalTypeRoot.DOUBLE,
-                LogicalTypeRoot.DATE,
-                LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,
-                LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
-                LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
-                LogicalTypeRoot.ARRAY);
-    }
 }
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresRowConverter.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresRowConverter.java
index 4f302cc..2e6e14c 100644
--- a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresRowConverter.java
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/postgres/dialect/PostgresRowConverter.java
@@ -18,84 +18,25 @@
 
 package org.apache.flink.connector.jdbc.databases.postgres.dialect;
 
-import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
-import org.apache.flink.table.data.GenericArrayData;
-import org.apache.flink.table.types.logical.ArrayType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.connector.jdbc.converter.AbstractPostgresCompatibleRowConverter;
 import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
 
 import org.postgresql.jdbc.PgArray;
 
-import java.lang.reflect.Array;
-
 /**
  * Runtime converter that responsible to convert between JDBC object and Flink internal object for
  * PostgreSQL.
  */
-public class PostgresRowConverter extends AbstractJdbcRowConverter {
+public class PostgresRowConverter extends AbstractPostgresCompatibleRowConverter<PgArray> {
 
     private static final long serialVersionUID = 1L;
 
-    @Override
-    public String converterName() {
-        return "PostgreSQL";
-    }
-
     public PostgresRowConverter(RowType rowType) {
         super(rowType);
     }
 
     @Override
-    public JdbcDeserializationConverter createInternalConverter(LogicalType type) {
-        LogicalTypeRoot root = type.getTypeRoot();
-
-        if (root == LogicalTypeRoot.ARRAY) {
-            ArrayType arrayType = (ArrayType) type;
-            return createPostgresArrayConverter(arrayType);
-        } else {
-            return createPrimitiveConverter(type);
-        }
-    }
-
-    @Override
-    protected JdbcSerializationConverter createNullableExternalConverter(LogicalType type) {
-        LogicalTypeRoot root = type.getTypeRoot();
-        if (root == LogicalTypeRoot.ARRAY) {
-            // note:Writing ARRAY type is not yet supported by PostgreSQL dialect now.
-            return (val, index, statement) -> {
-                throw new IllegalStateException(
-                        String.format(
-                                "Writing ARRAY type is not yet supported in JDBC:%s.",
-                                converterName()));
-            };
-        } else {
-            return super.createNullableExternalConverter(type);
-        }
-    }
-
-    private JdbcDeserializationConverter createPostgresArrayConverter(ArrayType arrayType) {
-        // Since PGJDBC 42.2.15 (https://github.com/pgjdbc/pgjdbc/pull/1194) bytea[] is wrapped in
-        // primitive byte arrays
-        final Class<?> elementClass =
-                LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
-        final JdbcDeserializationConverter elementConverter =
-                createNullableInternalConverter(arrayType.getElementType());
-        return val -> {
-            PgArray pgArray = (PgArray) val;
-            Object[] in = (Object[]) pgArray.getArray();
-            final Object[] array = (Object[]) Array.newInstance(elementClass, in.length);
-            for (int i = 0; i < in.length; i++) {
-                array[i] = elementConverter.deserialize(in[i]);
-            }
-            return new GenericArrayData(array);
-        };
-    }
-
-    // Have its own method so that Postgres can support primitives that super class doesn't support
-    // in the future
-    private JdbcDeserializationConverter createPrimitiveConverter(LogicalType type) {
-        return super.createInternalConverter(type);
+    public String converterName() {
+        return "PostgreSQL";
     }
 }
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java
new file mode 100644
index 0000000..0ca425f
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect;
+
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** JDBC dialect for PostgreSQL compatible databases. */
+public abstract class AbstractPostgresCompatibleDialect extends AbstractDialect {
+
+    private static final long serialVersionUID = 1L;
+
+    // Define MAX/MIN precision of TIMESTAMP type according to PostgreSQL docs:
+    // https://www.postgresql.org/docs/12/datatype-datetime.html
+    private static final int MAX_TIMESTAMP_PRECISION = 6;
+    private static final int MIN_TIMESTAMP_PRECISION = 1;
+
+    // Define MAX/MIN precision of DECIMAL type according to PostgreSQL docs:
+    // https://www.postgresql.org/docs/12/datatype-numeric.html#DATATYPE-NUMERIC-DECIMAL
+    private static final int MAX_DECIMAL_PRECISION = 1000;
+    private static final int MIN_DECIMAL_PRECISION = 1;
+
+    @Override
+    public String getLimitClause(long limit) {
+        return "LIMIT " + limit;
+    }
+
+    /** Postgres upsert query. It use ON CONFLICT ... DO UPDATE SET.. to replace into Postgres. */
+    @Override
+    public Optional<String> getUpsertStatement(
+            String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+        String uniqueColumns =
+                Arrays.stream(uniqueKeyFields)
+                        .map(this::quoteIdentifier)
+                        .collect(Collectors.joining(", "));
+        String updateClause =
+                Arrays.stream(fieldNames)
+                        .map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
+                        .collect(Collectors.joining(", "));
+        return Optional.of(
+                getInsertIntoStatement(tableName, fieldNames)
+                        + " ON CONFLICT ("
+                        + uniqueColumns
+                        + ")"
+                        + " DO UPDATE SET "
+                        + updateClause);
+    }
+
+    @Override
+    public String quoteIdentifier(String identifier) {
+        return identifier;
+    }
+
+    @Override
+    public Optional<Range> decimalPrecisionRange() {
+        return Optional.of(Range.of(MIN_DECIMAL_PRECISION, MAX_DECIMAL_PRECISION));
+    }
+
+    @Override
+    public Optional<Range> timestampPrecisionRange() {
+        return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
+    }
+
+    @Override
+    public Set<LogicalTypeRoot> supportedTypes() {
+        // The data types used in PostgreSQL are list at:
+        // https://www.postgresql.org/docs/12/datatype.html
+
+        // TODO: We can't convert BINARY data type to
+        //  PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
+        // LegacyTypeInfoDataTypeConverter.
+
+        return EnumSet.of(
+                LogicalTypeRoot.CHAR,
+                LogicalTypeRoot.VARCHAR,
+                LogicalTypeRoot.BOOLEAN,
+                LogicalTypeRoot.VARBINARY,
+                LogicalTypeRoot.DECIMAL,
+                LogicalTypeRoot.TINYINT,
+                LogicalTypeRoot.SMALLINT,
+                LogicalTypeRoot.INTEGER,
+                LogicalTypeRoot.BIGINT,
+                LogicalTypeRoot.FLOAT,
+                LogicalTypeRoot.DOUBLE,
+                LogicalTypeRoot.DATE,
+                LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,
+                LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+                LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
+                LogicalTypeRoot.ARRAY);
+    }
+}
diff --git a/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory b/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
index 4d12a47..e5a05b7 100644
--- a/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
+++ b/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
@@ -18,3 +18,4 @@
 org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialectFactory
 org.apache.flink.connector.jdbc.databases.oracle.dialect.OracleDialectFactory
 org.apache.flink.connector.jdbc.databases.sqlserver.dialect.SqlServerDialectFactory
+org.apache.flink.connector.jdbc.databases.cratedb.dialect.CrateDBDialectFactory
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalogITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalogITCase.java
new file mode 100644
index 0000000..11bf61d
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalogITCase.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.catalog;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.apache.flink.connector.jdbc.databases.cratedb.catalog.CrateDBCatalog.DEFAULT_DATABASE;
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** E2E test for {@link CrateDBCatalog}. */
+class CrateDBCatalogITCase extends CrateDBCatalogTestBase {
+
+    private TableEnvironment tEnv;
+
+    @BeforeEach
+    void setup() {
+        this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+        tEnv.getConfig().set(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
+        // use CrateDB catalog
+        tEnv.registerCatalog(TEST_CATALOG_NAME, catalog);
+        tEnv.useCatalog(TEST_CATALOG_NAME);
+    }
+
+    @Test
+    void testSelectField() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(String.format("select id from %s", TABLE1))
+                                .execute()
+                                .collect());
+        assertThat(results).hasToString("[+I[1]]");
+    }
+
+    @Test
+    void testWithoutSchema() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(String.format("select * from %s", TABLE1))
+                                .execute()
+                                .collect());
+        assertThat(results).hasToString("[+I[1]]");
+    }
+
+    @Test
+    void testWithSchema() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(
+                                        String.format(
+                                                "select * from `%s`",
+                                                CrateDBTablePath.fromFlinkTableName(TABLE1)))
+                                .execute()
+                                .collect());
+        assertThat(results).hasToString("[+I[1]]");
+    }
+
+    @Test
+    void testFullPath() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(
+                                        String.format(
+                                                "select * from %s.%s.`%s`",
+                                                TEST_CATALOG_NAME,
+                                                DEFAULT_DATABASE,
+                                                CrateDBTablePath.fromFlinkTableName(TABLE1)))
+                                .execute()
+                                .collect());
+        assertThat(results).hasToString("[+I[1]]");
+    }
+
+    @Test
+    void testGroupByInsert() throws Exception {
+        tEnv.executeSql(
+                        String.format(
+                                "insert into `%s`"
+                                        + "select `int`, `short`, max(`long`), max(`real`), "
+                                        + "max(`double`), max(`boolean`), "
+                                        + "max(`text`), max(`timestamp`) "
+                                        + "from `%s` group by `int`, `short`",
+                                TABLE_TARGET_PRIMITIVE, TABLE_PRIMITIVE_TYPE))
+                .await();
+        executeSQL(String.format("REFRESH TABLE doc.%s", TABLE_TARGET_PRIMITIVE));
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(String.format("select * from `%s`", TABLE_TARGET_PRIMITIVE))
+                                .execute()
+                                .collect());
+        assertThat(results)
+                .hasToString("[+I[1, 3, 4, 5.5, 6.6, true, b, 2016-06-22T19:10:25.123]]");
+    }
+
+    @Test
+    void testPrimitiveTypes() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(String.format("select * from %s", TABLE_PRIMITIVE_TYPE))
+                                .execute()
+                                .collect());
+
+        assertThat(results)
+                .hasToString(
+                        "[+I[1, 3, 3, 4, 4, 5.5, 5.5, 6.6, 6.6, true, a, b, c, d  , e, 192.168.0.100, 2016-06-22T19:10:25.123]]");
+    }
+
+    @Test
+    void testArrayTypes() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(String.format("select * from %s", TABLE_ARRAY_TYPE))
+                                .execute()
+                                .collect());
+
+        assertThat(results)
+                .hasToString(
+                        "[+I[[1, 2, 3], [3, 4, 5], [3, 4, 5], [4, 5, 6], [4, 5, 6], [5.5, 6.6, 7.7], [5.5, 6.6, 7.7], [6.6, 7.7, 8.8], [6.6, 7.7, 8.8], [true, false, true], [a, b, c], [a, b, c], [b, c, d], [b  , c  , d  ], [b, c, d], [0:0:0:0:0:ffff:c0a8:64, 10.2.5.28, 127.0.0.6], [2016-06-22T19:10:25.123, 2019-06-22T11:22:33.987], null]]");
+    }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalogTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalogTest.java
new file mode 100644
index 0000000..fbf5916
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalogTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.catalog;
+
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link CrateDBCatalog}. */
+class CrateDBCatalogTest extends CrateDBCatalogTestBase {
+
+    // ------ databases ------
+
+    @Test
+    void testGetDb_DatabaseNotExistException() {
+        assertThatThrownBy(() -> catalog.getDatabase("nonexistent"))
+                .isInstanceOf(DatabaseNotExistException.class)
+                .hasMessageContaining("Database nonexistent does not exist in Catalog");
+    }
+
+    @Test
+    void testListDatabases() {
+        Assertions.assertThat(catalog.listDatabases()).containsExactly("crate");
+    }
+
+    @Test
+    void testDbExists() {
+        Assertions.assertThat(catalog.databaseExists("nonexistent")).isFalse();
+        Assertions.assertThat(catalog.databaseExists(CrateDBCatalog.DEFAULT_DATABASE)).isTrue();
+    }
+
+    // ------ tables ------
+
+    @Test
+    void testListTables() throws DatabaseNotExistException {
+        List<String> actual = catalog.listTables(CrateDBCatalog.DEFAULT_DATABASE);
+
+        assertThat(actual)
+                .containsExactly(
+                        "doc.array_table",
+                        "doc.primitive_table",
+                        "doc.t1",
+                        "doc.t2",
+                        "doc.target_primitive_table",
+                        "test_schema.t3");
+    }
+
+    @Test
+    void testListTables_DatabaseNotExistException() {
+        assertThatThrownBy(() -> catalog.listTables("CrateDB"))
+                .isInstanceOf(DatabaseNotExistException.class);
+    }
+
+    @Test
+    void testTableExists() {
+        Assertions.assertThat(
+                        catalog.tableExists(
+                                new ObjectPath(CrateDBCatalog.DEFAULT_DATABASE, "nonexist")))
+                .isFalse();
+
+        Assertions.assertThat(
+                        catalog.tableExists(
+                                new ObjectPath(CrateDBCatalog.DEFAULT_DATABASE, TABLE1)))
+                .isTrue();
+        Assertions.assertThat(
+                        catalog.tableExists(
+                                new ObjectPath(CrateDBCatalog.DEFAULT_DATABASE, "test_schema.t3")))
+                .isTrue();
+    }
+
+    @Test
+    void testGetTables_TableNotExistException() {
+        assertThatThrownBy(
+                        () ->
+                                catalog.getTable(
+                                        new ObjectPath(
+                                                CrateDBCatalog.DEFAULT_DATABASE,
+                                                CrateDBTablePath.toFlinkTableName(
+                                                        TEST_SCHEMA, "anytable"))))
+                .isInstanceOf(TableNotExistException.class);
+    }
+
+    @Test
+    void testGetTables_TableNotExistException_NoSchema() {
+        assertThatThrownBy(
+                        () ->
+                                catalog.getTable(
+                                        new ObjectPath(
+                                                CrateDBCatalog.DEFAULT_DATABASE,
+                                                CrateDBTablePath.toFlinkTableName(
+                                                        "nonexistschema", "anytable"))))
+                .isInstanceOf(TableNotExistException.class);
+    }
+
+    @Test
+    void testGetTables_TableNotExistException_NoDb() {
+        assertThatThrownBy(
+                        () ->
+                                catalog.getTable(
+                                        new ObjectPath(
+                                                "nonexistdb",
+                                                CrateDBTablePath.toFlinkTableName(
+                                                        TEST_SCHEMA, "anytable"))))
+                .isInstanceOf(TableNotExistException.class);
+    }
+
+    @Test
+    void testGetTable() throws TableNotExistException {
+        // test crate.doc.t1
+        Schema schema = getSimpleTable().schema;
+        CatalogBaseTable table =
+                catalog.getTable(new ObjectPath(CrateDBCatalog.DEFAULT_DATABASE, TABLE1));
+
+        assertThat(table.getUnresolvedSchema()).isEqualTo(schema);
+
+        table = catalog.getTable(new ObjectPath(CrateDBCatalog.DEFAULT_DATABASE, "doc.t1"));
+
+        assertThat(table.getUnresolvedSchema()).isEqualTo(schema);
+
+        table = catalog.getTable(new ObjectPath(CrateDBCatalog.DEFAULT_DATABASE, TABLE2));
+
+        assertThat(table.getUnresolvedSchema()).isEqualTo(schema);
+
+        table = catalog.getTable(new ObjectPath(CrateDBCatalog.DEFAULT_DATABASE, "doc.t2"));
+
+        assertThat(table.getUnresolvedSchema()).isEqualTo(schema);
+
+        // test crate.test_schema.t2
+        table =
+                catalog.getTable(
+                        new ObjectPath(CrateDBCatalog.DEFAULT_DATABASE, TEST_SCHEMA + ".t3"));
+
+        assertThat(table.getUnresolvedSchema()).isEqualTo(schema);
+    }
+
+    @Test
+    void testPrimitiveDataTypes() throws TableNotExistException {
+        CatalogBaseTable table =
+                catalog.getTable(
+                        new ObjectPath(CrateDBCatalog.DEFAULT_DATABASE, TABLE_PRIMITIVE_TYPE));
+
+        assertThat(table.getUnresolvedSchema())
+                .hasToString(sanitizeSchemaSQL(getPrimitiveTable().schema.toString()));
+    }
+
+    @Test
+    void testArrayDataTypes() throws TableNotExistException {
+        CatalogBaseTable table =
+                catalog.getTable(new ObjectPath(CrateDBCatalog.DEFAULT_DATABASE, TABLE_ARRAY_TYPE));
+
+        assertThat(table.getUnresolvedSchema())
+                .hasToString(sanitizeSchemaSQL(getArrayTable().schema.toString()));
+    }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalogTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalogTestBase.java
new file mode 100644
index 0000000..9a9fdd0
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBCatalogTestBase.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.catalog;
+
+import org.apache.flink.connector.jdbc.testutils.JdbcITCaseBase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.TimeZone;
+
+import static org.apache.flink.connector.jdbc.testutils.databases.cratedb.CrateDBDatabase.CONTAINER;
+
+/** Test base for {@link CrateDBCatalog}. */
+class CrateDBCatalogTestBase implements JdbcITCaseBase, CrateDBTestBase {
+
+    public static final Logger LOG = LoggerFactory.getLogger(CrateDBCatalogTestBase.class);
+
+    protected static final String TEST_CATALOG_NAME = "mycratedb";
+    protected static final String TEST_USERNAME = CONTAINER.getUsername();
+    protected static final String TEST_PWD = CONTAINER.getPassword();
+    protected static final String TEST_SCHEMA = "test_schema";
+    protected static final String TABLE1 = "t1";
+    protected static final String TABLE2 = "t2";
+    protected static final String TABLE3 = "t3";
+    protected static final String TABLE_PRIMITIVE_TYPE = "primitive_table";
+    protected static final String TABLE_TARGET_PRIMITIVE = "target_primitive_table";
+    protected static final String TABLE_ARRAY_TYPE = "array_table";
+
+    protected static String baseUrl;
+    protected static CrateDBCatalog catalog;
+
+    @BeforeAll
+    static void init() throws SQLException {
+        // For deterministic timestamp field results
+        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
+
+        // jdbc:crate://localhost:50807/crate?user=crate
+        String jdbcUrl = CONTAINER.getJdbcUrl();
+        // jdbc:crate://localhost:50807/
+        baseUrl = jdbcUrl.substring(0, jdbcUrl.lastIndexOf("/"));
+
+        catalog =
+                new CrateDBCatalog(
+                        Thread.currentThread().getContextClassLoader(),
+                        TEST_CATALOG_NAME,
+                        CrateDBCatalog.DEFAULT_DATABASE,
+                        TEST_USERNAME,
+                        TEST_PWD,
+                        baseUrl);
+
+        // create test tables
+        // table: crate.doc.t1
+        // table: crate.doc.t2
+        createTable(CrateDBTablePath.fromFlinkTableName(TABLE1), getSimpleTable().crateDBSchemaSql);
+        createTable(CrateDBTablePath.fromFlinkTableName(TABLE2), getSimpleTable().crateDBSchemaSql);
+
+        // table: crate.test_schema.t3
+        createTable(new CrateDBTablePath(TEST_SCHEMA, TABLE3), getSimpleTable().crateDBSchemaSql);
+        createTable(
+                CrateDBTablePath.fromFlinkTableName(TABLE_PRIMITIVE_TYPE),
+                getPrimitiveTable().crateDBSchemaSql);
+        createTable(
+                CrateDBTablePath.fromFlinkTableName(TABLE_TARGET_PRIMITIVE),
+                getTargetPrimitiveTable().crateDBSchemaSql);
+        createTable(
+                CrateDBTablePath.fromFlinkTableName(TABLE_ARRAY_TYPE),
+                getArrayTable().crateDBSchemaSql);
+
+        executeSQL(
+                String.format("insert into doc.%s values (%s);", TABLE1, getSimpleTable().values));
+        executeSQL(
+                String.format(
+                        "insert into doc.%s values (%s);",
+                        TABLE_PRIMITIVE_TYPE, getPrimitiveTable().values));
+        executeSQL(
+                String.format(
+                        "insert into doc.%s values (%s);",
+                        TABLE_ARRAY_TYPE, getArrayTable().values));
+    }
+
+    public static void createTable(CrateDBTablePath tablePath, String tableSchemaSql)
+            throws SQLException {
+        executeSQL(String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql));
+        executeSQL(String.format("REFRESH TABLE %s", tablePath.getFullPath()));
+    }
+
+    public static void executeSQL(String sql) throws SQLException {
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                String.format("%s/%s", baseUrl, CrateDBCatalog.DEFAULT_DATABASE),
+                                TEST_USERNAME,
+                                TEST_PWD);
+                Statement statement = conn.createStatement()) {
+            statement.executeUpdate(sql);
+        }
+    }
+
+    /** Object holding schema and corresponding sql. */
+    public static class TestTable {
+        Schema schema;
+        String crateDBSchemaSql;
+        String values;
+
+        public TestTable(Schema schema, String crateDBSchemaSql, String values) {
+            this.schema = schema;
+            this.crateDBSchemaSql = crateDBSchemaSql;
+            this.values = values;
+        }
+    }
+
+    public static TestTable getSimpleTable() {
+        return new TestTable(
+                Schema.newBuilder().column("id", DataTypes.INT()).build(), "id integer", "1");
+    }
+
+    // TODO: add back timestamptz and time types.
+    //  Flink currently doesn't support converting time's precision, with the following error
+    //  TableException: Unsupported conversion from data type 'TIME(6)' (conversion class:
+    // java.sql.Time)
+    //  to type information. Only data types that originated from type information fully support a
+    // reverse conversion.
+    public static TestTable getPrimitiveTable() {
+        return new TestTable(
+                Schema.newBuilder()
+                        .column("int", DataTypes.INT().notNull())
+                        .column("short", DataTypes.SMALLINT().notNull())
+                        .column("smallint", DataTypes.SMALLINT())
+                        .column("long", DataTypes.BIGINT())
+                        .column("bigint", DataTypes.BIGINT())
+                        .column("float", DataTypes.FLOAT())
+                        .column("real", DataTypes.FLOAT())
+                        .column("double", DataTypes.DOUBLE())
+                        .column("double_precision", DataTypes.DOUBLE())
+                        .column("boolean", DataTypes.BOOLEAN())
+                        .column("string", DataTypes.STRING())
+                        .column("text", DataTypes.STRING())
+                        .column("char", DataTypes.CHAR(1))
+                        .column("character", DataTypes.CHAR(3))
+                        .column("character_varying", DataTypes.VARCHAR(20))
+                        .column("ip", DataTypes.STRING())
+                        .column("timestamp", DataTypes.TIMESTAMP(6))
+                        //  .column("timestamptz", DataTypes.TIMESTAMP_WITH_TIME_ZONE(6))
+                        .primaryKeyNamed("primitive_table_pk", "short", "int")
+                        .build(),
+                "int integer, "
+                        + "short short, "
+                        + "smallint smallint, "
+                        + "long long, "
+                        + "bigint bigint, "
+                        + "float float, "
+                        + "real real, "
+                        + "double double, "
+                        + "double_precision double precision, "
+                        + "boolean boolean, "
+                        + "string string, "
+                        + "text text, "
+                        + "char char, "
+                        + "character character(3), "
+                        + "character_varying character varying(20), "
+                        + "ip ip, "
+                        + "timestamp timestamp, "
+                        // + "timestamptz timestamptz, "
+                        + "PRIMARY KEY (short, int)",
+                // Values
+                "1,"
+                        + "3,"
+                        + "3,"
+                        + "4,"
+                        + "4,"
+                        + "5.5,"
+                        + "5.5,"
+                        + "6.6,"
+                        + "6.6,"
+                        + "true,"
+                        + "'a',"
+                        + "'b',"
+                        + "'c',"
+                        + "'d',"
+                        + "'e',"
+                        + "'0:0:0:0:0:ffff:c0a8:64',"
+                        + "'2016-06-22 19:10:25.123456'");
+        //  + "'2006-06-22 19:10:25.123456'");
+    }
+
+    // TODO: add back timestamptz once planner supports timestamp with timezone
+    public static TestTable getArrayTable() {
+        return new TestTable(
+                Schema.newBuilder()
+                        .column("int_arr", DataTypes.ARRAY(DataTypes.INT()))
+                        .column("short_arr", DataTypes.ARRAY(DataTypes.SMALLINT()))
+                        .column("smallint_arr", DataTypes.ARRAY(DataTypes.SMALLINT()))
+                        .column("long_arr", DataTypes.ARRAY(DataTypes.BIGINT()))
+                        .column("bigint_arr", DataTypes.ARRAY(DataTypes.BIGINT()))
+                        .column("float_arr", DataTypes.ARRAY(DataTypes.FLOAT()))
+                        .column("real_arr", DataTypes.ARRAY(DataTypes.FLOAT()))
+                        .column("double_arr", DataTypes.ARRAY(DataTypes.DOUBLE()))
+                        .column("double_precision_arr", DataTypes.ARRAY(DataTypes.DOUBLE()))
+                        .column("boolean_arr", DataTypes.ARRAY(DataTypes.BOOLEAN()))
+                        .column("string_arr", DataTypes.ARRAY(DataTypes.STRING()))
+                        .column("text_arr", DataTypes.ARRAY(DataTypes.STRING()))
+                        .column("char_arr", DataTypes.ARRAY(DataTypes.CHAR(1)))
+                        .column("character_arr", DataTypes.ARRAY(DataTypes.CHAR(3)))
+                        .column("character_varying_arr", DataTypes.ARRAY(DataTypes.VARCHAR(20)))
+                        .column("ip", DataTypes.ARRAY(DataTypes.STRING()))
+                        .column("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP(6)))
+                        // .column("timestamptz_arr",
+                        // DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(4)))
+                        .column("null_text_arr", DataTypes.ARRAY(DataTypes.STRING()))
+                        .build(),
+                "int_arr integer[], "
+                        + "short_arr short[],"
+                        + "smallint_arr smallint[],"
+                        + "long_arr long[], "
+                        + "bigint_arr bigint[], "
+                        + "float_arr float[], "
+                        + "real_arr real[], "
+                        + "double_arr double[], "
+                        + "double_precision_arr double precision[], "
+                        + "boolean_arr boolean[], "
+                        + "string_arr string[], "
+                        + "text_arr text[], "
+                        + "char_arr char[], "
+                        + "character_arr character(3)[], "
+                        + "character_varying_arr character varying(20)[],"
+                        + "ip string[],"
+                        + "timestamp_arr timestamp[], "
+                        // + "timestamptz_arr timestamptz[], "
+                        + "null_text_arr text[]",
+                // Values
+                "[1,2,3],"
+                        + "[3,4,5],"
+                        + "[3,4,5],"
+                        + "[4,5,6],"
+                        + "[4,5,6],"
+                        + "[5.5,6.6,7.7],"
+                        + "[5.5,6.6,7.7],"
+                        + "[6.6,7.7,8.8],"
+                        + "[6.6,7.7,8.8],"
+                        + "[true,false,true],"
+                        + "['a','b','c'],"
+                        + "['a','b','c'],"
+                        + "['b','c','d'],"
+                        + "['b','c','d'],"
+                        + "['b','c','d'],"
+                        + "['0:0:0:0:0:ffff:c0a8:64', '10.2.5.28', '127.0.0.6'],"
+                        + "['2016-06-22 19:10:25.123456', '2019-06-22 11:22:33.987654'],"
+                        // + "['2006-06-22 19:10:25.123456', '2019-06-22 11:22:33.987654'],"
+                        + "NULL");
+    }
+
+    public static TestTable getTargetPrimitiveTable() {
+        return new TestTable(
+                Schema.newBuilder()
+                        .column("int", DataTypes.INT().notNull())
+                        .column("short", DataTypes.SMALLINT().notNull())
+                        .column("long", DataTypes.BIGINT())
+                        .column("real", DataTypes.FLOAT())
+                        .column("double", DataTypes.DOUBLE())
+                        .column("boolean", DataTypes.BOOLEAN())
+                        .column("text", DataTypes.STRING())
+                        .column("timestamp", DataTypes.TIMESTAMP(6))
+                        .build(),
+                "int integer, "
+                        + "short short, "
+                        + "long long, "
+                        + "real real, "
+                        + "double double, "
+                        + "boolean boolean, "
+                        + "text text, "
+                        + "timestamp timestamp",
+                // Values
+                null);
+    }
+
+    protected static String sanitizeSchemaSQL(String schemaSQL) {
+        return schemaSQL
+                .replaceAll("CHAR\\(\\d+\\)", "CHAR(2147483647)")
+                .replaceAll("VARCHAR\\(\\d+\\)", "STRING");
+    }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTablePathTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTablePathTest.java
new file mode 100644
index 0000000..76af085
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTablePathTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.catalog;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link CrateDBTablePath}. */
+class CrateDBTablePathTest {
+
+    @Test
+    void testToFlinkTableName() {
+        assertThat(CrateDBTablePath.toFlinkTableName("my_schema", "my_table"))
+                .isEqualTo("my_schema.my_table");
+        assertThat(CrateDBTablePath.toFlinkTableName("crate.my_schema", "my_table"))
+                .isEqualTo("crate.my_schema.my_table");
+        assertThatThrownBy(() -> CrateDBTablePath.toFlinkTableName("", "my_table"))
+                .isExactlyInstanceOf(IllegalArgumentException.class)
+                .hasMessage("Schema name is not valid. Null or empty is not allowed");
+    }
+
+    @Test
+    void testFromFlinkTableName() {
+        assertThat(CrateDBTablePath.fromFlinkTableName("my_schema.my_table"))
+                .isEqualTo(new CrateDBTablePath("my_schema", "my_table"));
+        assertThat(CrateDBTablePath.fromFlinkTableName("my_table"))
+                .isEqualTo(new CrateDBTablePath("doc", "my_table"));
+        assertThatThrownBy(() -> CrateDBTablePath.fromFlinkTableName("crate.doc.my_table"))
+                .isExactlyInstanceOf(IllegalArgumentException.class)
+                .hasMessage("Table name 'crate.doc.my_table' is not valid. The parsed length is 3");
+        assertThatThrownBy(() -> CrateDBTablePath.fromFlinkTableName(""))
+                .isExactlyInstanceOf(IllegalArgumentException.class)
+                .hasMessage("Table name is not valid. Null or empty is not allowed");
+    }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTestBase.java
new file mode 100644
index 0000000..96f2c35
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/cratedb/catalog/CrateDBTestBase.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.cratedb.catalog;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
+import org.apache.flink.connector.jdbc.testutils.databases.cratedb.CrateDBDatabase;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/** Base class for CrateDB testing. */
+@ExtendWith(CrateDBDatabase.class)
+public interface CrateDBTestBase extends DatabaseTest {
+
+    @Override
+    default DatabaseMetadata getMetadata() {
+        return CrateDBDatabase.getMetadata();
+    }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTablePathTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTablePathTest.java
index 6798834..38625be 100644
--- a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTablePathTest.java
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/postgres/catalog/PostgresTablePathTest.java
@@ -21,12 +21,33 @@
 import org.junit.jupiter.api.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link PostgresTablePath}. */
 class PostgresTablePathTest {
     @Test
+    void testToFlinkTableName() {
+        assertThat(PostgresTablePath.toFlinkTableName("my_schema", "my_table"))
+                .isEqualTo("my_schema.my_table");
+        assertThat(PostgresTablePath.toFlinkTableName("postgres.my_schema", "my_table"))
+                .isEqualTo("postgres.my_schema.my_table");
+        assertThatThrownBy(() -> PostgresTablePath.toFlinkTableName("", "my_table"))
+                .isExactlyInstanceOf(IllegalArgumentException.class)
+                .hasMessage("Schema name is not valid. Null or empty is not allowed");
+    }
+
+    @Test
     void testFromFlinkTableName() {
-        assertThat(PostgresTablePath.fromFlinkTableName("public.topic"))
-                .isEqualTo(new PostgresTablePath("public", "topic"));
+        assertThat(PostgresTablePath.fromFlinkTableName("my_schema.my_table"))
+                .isEqualTo(new PostgresTablePath("my_schema", "my_table"));
+        assertThat(PostgresTablePath.fromFlinkTableName("my_table"))
+                .isEqualTo(new PostgresTablePath("public", "my_table"));
+        assertThatThrownBy(() -> PostgresTablePath.fromFlinkTableName("postgres.public.my_table"))
+                .isExactlyInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "Table name 'postgres.public.my_table' is not valid. The parsed length is 3");
+        assertThatThrownBy(() -> PostgresTablePath.fromFlinkTableName(""))
+                .isExactlyInstanceOf(IllegalArgumentException.class)
+                .hasMessage("Table name is not valid. Null or empty is not allowed");
     }
 }
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/cratedb/CrateDBDialectTypeTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/cratedb/CrateDBDialectTypeTest.java
new file mode 100644
index 0000000..563adb2
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/cratedb/CrateDBDialectTypeTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.cratedb;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeTest;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** The CrateDB params for {@link JdbcDialectTypeTest}. */
+public class CrateDBDialectTypeTest extends JdbcDialectTypeTest {
+
+    @Override
+    protected String testDialect() {
+        return "crate";
+    }
+
+    @Override
+    protected List<TestItem> testData() {
+        return Arrays.asList(
+                createTestItem("CHAR"),
+                createTestItem("VARCHAR"),
+                createTestItem("BOOLEAN"),
+                createTestItem("TINYINT"),
+                createTestItem("SMALLINT"),
+                createTestItem("INTEGER"),
+                createTestItem("BIGINT"),
+                createTestItem("FLOAT"),
+                createTestItem("DOUBLE"),
+                createTestItem("DECIMAL(10, 4)"),
+                createTestItem("DECIMAL(38, 18)"),
+                createTestItem("DATE"),
+                createTestItem("TIME"),
+                createTestItem("TIMESTAMP(3)"),
+                createTestItem("TIMESTAMP WITHOUT TIME ZONE"),
+                createTestItem("ARRAY<INTEGER>"),
+
+                // Not valid data
+                createTestItem("BINARY", "The CrateDB dialect doesn't support type: BINARY(1)."),
+                createTestItem(
+                        "TIMESTAMP(9) WITHOUT TIME ZONE",
+                        "The precision of field 'f0' is out of the TIMESTAMP precision range [1, 6] supported by CrateDB dialect."),
+                createTestItem("TIMESTAMP_LTZ(3)", "Unsupported type:TIMESTAMP_LTZ(3)"));
+    }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/cratedb/CrateDBDatabase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/cratedb/CrateDBDatabase.java
new file mode 100644
index 0000000..eae2148
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/cratedb/CrateDBDatabase.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.cratedb;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.containers.wait.strategy.WaitStrategy;
+import org.testcontainers.utility.DockerImageName;
+
+import java.time.Duration;
+
+import static java.time.temporal.ChronoUnit.SECONDS;
+
+/** A CrateDB database for testing. */
+public class CrateDBDatabase extends DatabaseExtension {
+
+    private static final String CRATEDB = "crate:5.3.1";
+    private static final int CRATEDB_PG_PORT = 5432;
+    private static final int CRATEDB_HTTP_PORT = 4200;
+
+    private static final DockerImageName CRATEDB_DOCKER_IMAGE =
+            DockerImageName.parse(CRATEDB).asCompatibleSubstituteFor("postgres");
+    private static final WaitStrategy WAIT_STRATEGY =
+            Wait.forHttp("/")
+                    .forPort(CRATEDB_HTTP_PORT)
+                    .forStatusCode(200)
+                    .withStartupTimeout(Duration.of(60, SECONDS));
+
+    private static CrateDBMetadata metadata;
+
+    public static final CrateDBContainer CONTAINER =
+            new CrateDBContainer(CRATEDB_DOCKER_IMAGE)
+                    .withDatabaseName("crate")
+                    .withUsername("crate")
+                    .withPassword("crate")
+                    .withCommand("crate")
+                    .withEnv("TZ", "UTC") // For deterministic timestamp field results
+                    .waitingFor(WAIT_STRATEGY);
+
+    public static CrateDBMetadata getMetadata() {
+        if (!CONTAINER.isRunning()) {
+            throw new FlinkRuntimeException("Container is stopped.");
+        }
+        if (metadata == null) {
+            metadata = new CrateDBMetadata(CONTAINER);
+        }
+        return metadata;
+    }
+
+    @Override
+    protected DatabaseMetadata startDatabase() throws Exception {
+        CONTAINER.start();
+        return getMetadata();
+    }
+
+    @Override
+    protected void stopDatabase() throws Exception {
+        CONTAINER.stop();
+        metadata = null;
+    }
+
+    /**
+     * Workaround to use testcontainers with <a
+     * href="https://crate.io/docs/jdbc/en/latest/index.html">legacy CrateDB JDBC driver</a>.
+     */
+    public static class CrateDBContainer extends JdbcDatabaseContainer<CrateDBContainer> {
+
+        public static final String IMAGE = "crate";
+
+        private String databaseName = "crate";
+
+        private String username = "crate";
+
+        private String password = "crate";
+
+        public CrateDBContainer(final DockerImageName dockerImageName) {
+            super(dockerImageName);
+            dockerImageName.assertCompatibleWith(DockerImageName.parse(IMAGE));
+
+            this.waitStrategy = Wait.forHttp("/").forPort(CRATEDB_HTTP_PORT).forStatusCode(200);
+
+            addExposedPort(CRATEDB_PG_PORT);
+            addExposedPort(CRATEDB_HTTP_PORT);
+        }
+
+        @Override
+        public String getDriverClassName() {
+            return "io.crate.client.jdbc.CrateDriver";
+        }
+
+        @Override
+        public String getJdbcUrl() {
+            String additionalUrlParams = constructUrlParameters("?", "&");
+            return ("jdbc:crate://"
+                    + getHost()
+                    + ":"
+                    + getMappedPort(CRATEDB_PG_PORT)
+                    + "/"
+                    + databaseName
+                    + additionalUrlParams);
+        }
+
+        @Override
+        public String getDatabaseName() {
+            return databaseName;
+        }
+
+        @Override
+        public String getUsername() {
+            return username;
+        }
+
+        @Override
+        public String getPassword() {
+            return password;
+        }
+
+        @Override
+        public String getTestQueryString() {
+            return "SELECT 1";
+        }
+
+        @Override
+        public CrateDBContainer withDatabaseName(final String databaseName) {
+            this.databaseName = databaseName;
+            return self();
+        }
+
+        @Override
+        public CrateDBContainer withUsername(final String username) {
+            this.username = username;
+            return self();
+        }
+
+        @Override
+        public CrateDBContainer withPassword(final String password) {
+            this.password = password;
+            return self();
+        }
+
+        @Override
+        protected void waitUntilContainerStarted() {
+            getWaitStrategy().waitUntilReady(this);
+        }
+    }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/cratedb/CrateDBMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/cratedb/CrateDBMetadata.java
new file mode 100644
index 0000000..b44792d
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/cratedb/CrateDBMetadata.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.cratedb;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+
+import javax.sql.XADataSource;
+
+/** CrateDB Metadata. */
+public class CrateDBMetadata implements DatabaseMetadata {
+
+    private final String username;
+    private final String password;
+    private final String url;
+    private final String driver;
+    private final String version;
+
+    public CrateDBMetadata(CrateDBDatabase.CrateDBContainer container) {
+        this.username = container.getUsername();
+        this.password = container.getPassword();
+        this.url = container.getJdbcUrl();
+        this.driver = container.getDriverClassName();
+        this.version = container.getDockerImageName();
+    }
+
+    @Override
+    public String getJdbcUrl() {
+        return this.url;
+    }
+
+    @Override
+    public String getJdbcUrlWithCredentials() {
+        return String.format("%s&user=%s&password=%s", getJdbcUrl(), getUsername(), getPassword());
+    }
+
+    @Override
+    public String getUsername() {
+        return this.username;
+    }
+
+    @Override
+    public String getPassword() {
+        return this.password;
+    }
+
+    @Override
+    public XADataSource buildXaDataSource() {
+        throw new UnsupportedOperationException("CrateDB doesn't support XA");
+    }
+
+    @Override
+    public String getDriverClass() {
+        return this.driver;
+    }
+
+    @Override
+    public String getVersion() {
+        return version;
+    }
+}