[FLINK-14645][connectors] Support new schema properties for SQL connectors
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java
index 4b1d2bc..ebf641e 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryBase.java
@@ -82,6 +82,7 @@
 import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
 import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE;
 import static org.apache.flink.table.descriptors.Schema.SCHEMA;
+import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE;
 import static org.apache.flink.table.descriptors.Schema.SCHEMA_NAME;
 import static org.apache.flink.table.descriptors.Schema.SCHEMA_TYPE;
 import static org.apache.flink.table.descriptors.StreamTableDescriptorValidator.UPDATE_MODE;
@@ -136,6 +137,7 @@
 		properties.add(CONNECTOR_CONNECTION_PATH_PREFIX);
 
 		// schema
+		properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
 		properties.add(SCHEMA + ".#." + SCHEMA_TYPE);
 		properties.add(SCHEMA + ".#." + SCHEMA_NAME);
 
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryTestBase.java b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryTestBase.java
index 0cc282c..fce8983 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryTestBase.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkFactoryTestBase.java
@@ -22,8 +22,8 @@
 import org.apache.flink.formats.json.JsonRowSerializationSchema;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.Host;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase.SinkOption;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.Types;
 import org.apache.flink.table.descriptors.Elasticsearch;
 import org.apache.flink.table.descriptors.Json;
 import org.apache.flink.table.descriptors.Schema;
@@ -106,10 +106,10 @@
 					.deriveSchema())
 			.withSchema(
 				new Schema()
-					.field(FIELD_KEY, Types.LONG())
-					.field(FIELD_FRUIT_NAME, Types.STRING())
-					.field(FIELD_COUNT, Types.DECIMAL())
-					.field(FIELD_TS, Types.SQL_TIMESTAMP()))
+					.field(FIELD_KEY, DataTypes.BIGINT())
+					.field(FIELD_FRUIT_NAME, DataTypes.STRING())
+					.field(FIELD_COUNT, DataTypes.DECIMAL(10, 4))
+					.field(FIELD_TS, DataTypes.TIMESTAMP(3)))
 			.inUpsertMode();
 
 		final Map<String, String> propertiesMap = testDesc.toProperties();
@@ -121,10 +121,10 @@
 
 	protected TableSchema createTestSchema() {
 		return TableSchema.builder()
-			.field(FIELD_KEY, Types.LONG())
-			.field(FIELD_FRUIT_NAME, Types.STRING())
-			.field(FIELD_COUNT, Types.DECIMAL())
-			.field(FIELD_TS, Types.SQL_TIMESTAMP())
+			.field(FIELD_KEY, DataTypes.BIGINT())
+			.field(FIELD_FRUIT_NAME, DataTypes.STRING())
+			.field(FIELD_COUNT, DataTypes.DECIMAL(10, 4))
+			.field(FIELD_TS, DataTypes.TIMESTAMP(3))
 			.build();
 	}