[FLINK-17887][table][connector] Improve interface of ScanFormatFactory and SinkFormatFactory


We improved the interfaces with the following changes:
1. Have a common interface DynamicTableSource.Context, and make Context of ScanTableSource and LookupTableSource extend it, and rename them to LookupContext and ScanContext
2. Change parameter of ScanFormat.createScanFormat from ScanTableSource.Context to DynamicTableSource.Context
3. Rename ScanFormat.createScanFormat to DecodingFormat#createRuntimeDecoder()
4. Rename SinkFormat.createSinkFormat to EncodingFormat#createRuntimeEncoder()
5. Rename ScanFormatFactory to DecodingFormatFactory
6. Rename SinkFormatFactory to EncodingFormatFactory

This closes #12320
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
index eadf659..bedfbef 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSink.java
@@ -25,7 +25,7 @@
 import org.apache.flink.streaming.connectors.elasticsearch6.RestClientFactory;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 import org.apache.flink.table.data.RowData;
@@ -52,12 +52,12 @@
 	@VisibleForTesting
 	static final Elasticsearch7RequestFactory REQUEST_FACTORY = new Elasticsearch7RequestFactory();
 
-	private final SinkFormat<SerializationSchema<RowData>> format;
+	private final EncodingFormat<SerializationSchema<RowData>> format;
 	private final TableSchema schema;
 	private final Elasticsearch6Configuration config;
 
 	public Elasticsearch6DynamicSink(
-			SinkFormat<SerializationSchema<RowData>> format,
+			EncodingFormat<SerializationSchema<RowData>> format,
 			Elasticsearch6Configuration config,
 			TableSchema schema) {
 		this(format, config, schema, (ElasticsearchSink.Builder::new));
@@ -83,7 +83,7 @@
 	}
 
 	Elasticsearch6DynamicSink(
-			SinkFormat<SerializationSchema<RowData>> format,
+			EncodingFormat<SerializationSchema<RowData>> format,
 			Elasticsearch6Configuration config,
 			TableSchema schema,
 			ElasticSearchBuilderProvider builderProvider) {
@@ -111,7 +111,7 @@
 	@Override
 	public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
 		return () -> {
-			SerializationSchema<RowData> format = this.format.createSinkFormat(context, schema.toRowDataType());
+			SerializationSchema<RowData> format = this.format.createRuntimeEncoder(context, schema.toRowDataType());
 
 			final RowElasticsearchSinkFunction upsertFunction =
 				new RowElasticsearchSinkFunction(
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
index 65c90b5..c5d9c89 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkFactory.java
@@ -24,7 +24,7 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
@@ -84,7 +84,7 @@
 		ElasticsearchValidationUtils.validatePrimaryKey(tableSchema);
 		final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
 
-		final SinkFormat<SerializationSchema<RowData>> format = helper.discoverSinkFormat(
+		final EncodingFormat<SerializationSchema<RowData>> format = helper.discoverEncodingFormat(
 			SerializationFormatFactory.class,
 			FORMAT_OPTION);
 
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
index df54147..1708efc 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch6DynamicSinkTest.java
@@ -28,7 +28,7 @@
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
@@ -65,7 +65,7 @@
 
 		BuilderProvider provider = new BuilderProvider();
 		final Elasticsearch6DynamicSink testSink = new Elasticsearch6DynamicSink(
-			new DummySinkFormat(),
+			new DummyEncodingFormat(),
 			new Elasticsearch6Configuration(getConfig(), this.getClass().getClassLoader()),
 			schema,
 			provider
@@ -141,9 +141,9 @@
 		}
 	}
 
-	private static class DummySinkFormat implements SinkFormat<SerializationSchema<RowData>> {
+	private static class DummyEncodingFormat implements EncodingFormat<SerializationSchema<RowData>> {
 		@Override
-		public SerializationSchema<RowData> createSinkFormat(
+		public SerializationSchema<RowData> createRuntimeEncoder(
 				DynamicTableSink.Context context,
 				DataType consumedDataType) {
 			return DummySerializationSchema.INSTANCE;
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
index 4076b63..408673e 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java
@@ -25,7 +25,7 @@
 import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 import org.apache.flink.table.data.RowData;
@@ -52,12 +52,12 @@
 	@VisibleForTesting
 	static final Elasticsearch7RequestFactory REQUEST_FACTORY = new Elasticsearch7DynamicSink.Elasticsearch7RequestFactory();
 
-	private final SinkFormat<SerializationSchema<RowData>> format;
+	private final EncodingFormat<SerializationSchema<RowData>> format;
 	private final TableSchema schema;
 	private final Elasticsearch7Configuration config;
 
 	public Elasticsearch7DynamicSink(
-			SinkFormat<SerializationSchema<RowData>> format,
+			EncodingFormat<SerializationSchema<RowData>> format,
 			Elasticsearch7Configuration config,
 			TableSchema schema) {
 		this(format, config, schema, (ElasticsearchSink.Builder::new));
@@ -83,7 +83,7 @@
 	}
 
 	Elasticsearch7DynamicSink(
-			SinkFormat<SerializationSchema<RowData>> format,
+			EncodingFormat<SerializationSchema<RowData>> format,
 			Elasticsearch7Configuration config,
 			TableSchema schema,
 			ElasticSearchBuilderProvider builderProvider) {
@@ -111,7 +111,7 @@
 	@Override
 	public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
 		return () -> {
-			SerializationSchema<RowData> format = this.format.createSinkFormat(context, schema.toRowDataType());
+			SerializationSchema<RowData> format = this.format.createRuntimeEncoder(context, schema.toRowDataType());
 
 			final RowElasticsearchSinkFunction upsertFunction =
 				new RowElasticsearchSinkFunction(
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
index 055989b..ae7a9fd 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkFactory.java
@@ -24,7 +24,7 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
@@ -83,7 +83,7 @@
 
 		final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
 
-		final SinkFormat<SerializationSchema<RowData>> format = helper.discoverSinkFormat(
+		final EncodingFormat<SerializationSchema<RowData>> format = helper.discoverEncodingFormat(
 			SerializationFormatFactory.class,
 			FORMAT_OPTION);
 
diff --git a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java
index 466ede3..c972cee 100644
--- a/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java
+++ b/flink-connectors/flink-connector-elasticsearch7/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSinkTest.java
@@ -28,7 +28,7 @@
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
-import org.apache.flink.table.connector.format.SinkFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
@@ -65,7 +65,7 @@
 
 		BuilderProvider provider = new BuilderProvider();
 		final Elasticsearch7DynamicSink testSink = new Elasticsearch7DynamicSink(
-			new DummySinkFormat(),
+			new DummyEncodingFormat(),
 			new Elasticsearch7Configuration(getConfig(), this.getClass().getClassLoader()),
 			schema,
 			provider
@@ -141,9 +141,9 @@
 		}
 	}
 
-	private static class DummySinkFormat implements SinkFormat<SerializationSchema<RowData>> {
+	private static class DummyEncodingFormat implements EncodingFormat<SerializationSchema<RowData>> {
 		@Override
-		public SerializationSchema<RowData> createSinkFormat(
+		public SerializationSchema<RowData> createRuntimeEncoder(
 				DynamicTableSink.Context context,
 				DataType consumedDataType) {
 			return DummySerializationSchema.INSTANCE;