[FLINK-19823][table][fs-connector] Filesystem connector supports de/serialization schema

This closes #13957
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java
deleted file mode 100644
index 831b8ca..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json;
-
-import org.apache.flink.api.common.io.DelimitedInputFormat;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.api.common.serialization.Encoder;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.FileSystemFormatFactory;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.utils.PartitionPathUtils;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.formats.json.JsonFormatFactory.validateFormatOptions;
-import static org.apache.flink.formats.json.JsonOptions.FAIL_ON_MISSING_FIELD;
-import static org.apache.flink.formats.json.JsonOptions.IGNORE_PARSE_ERRORS;
-import static org.apache.flink.formats.json.JsonOptions.TIMESTAMP_FORMAT;
-
-/**
- * Factory to build reader/writer to read/write json format file.
- */
-public class JsonFileSystemFormatFactory implements FileSystemFormatFactory {
-
-	public static final String IDENTIFIER = "json";
-
-	@Override
-	public String factoryIdentifier() {
-		return IDENTIFIER;
-	}
-
-	@Override
-	public Set<ConfigOption<?>> requiredOptions() {
-		return new HashSet<>();
-	}
-
-	@Override
-	public Set<ConfigOption<?>> optionalOptions() {
-		Set<ConfigOption<?>> options = new HashSet<>();
-		options.add(FAIL_ON_MISSING_FIELD);
-		options.add(IGNORE_PARSE_ERRORS);
-		options.add(TIMESTAMP_FORMAT);
-		return options;
-	}
-
-	@Override
-	public InputFormat<RowData, ?> createReader(ReaderContext context) {
-		ReadableConfig options = context.getFormatOptions();
-		validateFormatOptions(options);
-		boolean failOnMissingField = options.get(FAIL_ON_MISSING_FIELD);
-		boolean ignoreParseErrors = options.get(IGNORE_PARSE_ERRORS);
-		TimestampFormat timestampOption = JsonOptions.getTimestampFormat(options);
-
-		RowType formatRowType = context.getFormatRowType();
-		JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
-			formatRowType,
-			new GenericTypeInfo(GenericRowData.class),
-			failOnMissingField,
-			ignoreParseErrors,
-			timestampOption);
-
-		String[] fieldNames = context.getSchema().getFieldNames();
-		List<String> projectFields = Arrays.stream(context.getProjectFields())
-			.mapToObj(idx -> fieldNames[idx])
-			.collect(Collectors.toList());
-		List<String> jsonFields = Arrays.stream(fieldNames)
-			.filter(field -> !context.getPartitionKeys().contains(field))
-			.collect(Collectors.toList());
-
-		int[] jsonSelectFieldToProjectFieldMapping = context.getFormatProjectFields().stream()
-			.mapToInt(projectFields::indexOf)
-			.toArray();
-		int[] jsonSelectFieldToJsonFieldMapping = context.getFormatProjectFields().stream()
-			.mapToInt(jsonFields::indexOf)
-			.toArray();
-
-		return new JsonInputFormat(
-			context.getPaths(),
-			context.getSchema().getFieldDataTypes(),
-			context.getSchema().getFieldNames(),
-			context.getProjectFields(),
-			context.getPartitionKeys(),
-			context.getDefaultPartName(),
-			context.getPushedDownLimit(),
-			jsonSelectFieldToProjectFieldMapping,
-			jsonSelectFieldToJsonFieldMapping,
-			deserializationSchema);
-	}
-
-	@Override
-	public Optional<Encoder<RowData>> createEncoder(WriterContext context) {
-		return Optional.of(new JsonRowDataEncoder(new JsonRowDataSerializationSchema(context.getFormatRowType(),
-			JsonOptions.getTimestampFormat(context.getFormatOptions()))));
-	}
-
-	@Override
-	public Optional<BulkWriter.Factory<RowData>> createBulkWriterFactory(WriterContext context) {
-		return Optional.empty();
-	}
-
-	/**
-	 * A {@link JsonInputFormat} is responsible to read {@link RowData} records
-	 * from json format files.
-	 */
-	public static class JsonInputFormat extends DelimitedInputFormat<RowData> {
-
-		private static final long serialVersionUID = 1L;
-
-		/**
-		 * Code of \r, used to remove \r from a line when the line ends with \r\n.
-		 */
-		private static final byte CARRIAGE_RETURN = (byte) '\r';
-
-		/**
-		 * Code of \n, used to identify if \n is used as delimiter.
-		 */
-		private static final byte NEW_LINE = (byte) '\n';
-
-		private final DataType[] fieldTypes;
-		private final String[] fieldNames;
-		private final int[] selectFields;
-		private final List<String> partitionKeys;
-		private final String defaultPartValue;
-		private final long limit;
-		private final int[] jsonSelectFieldToProjectFieldMapping;
-		private final int[] jsonSelectFieldToJsonFieldMapping;
-		private final JsonRowDataDeserializationSchema deserializationSchema;
-
-		private transient boolean end;
-		private transient long emitted;
-		// reuse object for per record
-		private transient GenericRowData rowData;
-
-		public JsonInputFormat(
-			Path[] filePaths,
-			DataType[] fieldTypes,
-			String[] fieldNames,
-			int[] selectFields,
-			List<String> partitionKeys,
-			String defaultPartValue,
-			long limit,
-			int[] jsonSelectFieldToProjectFieldMapping,
-			int[] jsonSelectFieldToJsonFieldMapping,
-			JsonRowDataDeserializationSchema deserializationSchema) {
-			super.setFilePaths(filePaths);
-			this.fieldTypes = fieldTypes;
-			this.fieldNames = fieldNames;
-			this.selectFields = selectFields;
-			this.partitionKeys = partitionKeys;
-			this.defaultPartValue = defaultPartValue;
-			this.limit = limit;
-			this.jsonSelectFieldToProjectFieldMapping = jsonSelectFieldToProjectFieldMapping;
-			this.jsonSelectFieldToJsonFieldMapping = jsonSelectFieldToJsonFieldMapping;
-			this.deserializationSchema = deserializationSchema;
-		}
-
-		@Override
-		public boolean supportsMultiPaths() {
-			return true;
-		}
-
-		@Override
-		public void open(FileInputSplit split) throws IOException {
-			super.open(split);
-			this.end = false;
-			this.emitted = 0L;
-			this.rowData = PartitionPathUtils.fillPartitionValueForRecord(fieldNames, fieldTypes, selectFields,
-				partitionKeys, currentSplit.getPath(), defaultPartValue);
-		}
-
-		@Override
-		public boolean reachedEnd() {
-			return emitted >= limit || end;
-		}
-
-		@Override
-		public RowData readRecord(RowData reuse, byte[] bytes, int offset, int numBytes) throws IOException {
-			// remove \r from a line when the line ends with \r\n
-			if (this.getDelimiter() != null && this.getDelimiter().length == 1
-				&& this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= 1
-				&& bytes[offset + numBytes - 1] == CARRIAGE_RETURN) {
-				numBytes -= 1;
-			}
-			byte[] trimBytes = Arrays.copyOfRange(bytes, offset, offset + numBytes);
-			GenericRowData jsonRow = (GenericRowData) deserializationSchema.deserialize(trimBytes);
-
-			if (jsonRow == null) {
-				return null;
-			}
-
-			GenericRowData returnRecord = rowData;
-			for (int i = 0; i < jsonSelectFieldToJsonFieldMapping.length; i++) {
-				returnRecord.setField(jsonSelectFieldToProjectFieldMapping[i],
-					jsonRow.getField(jsonSelectFieldToJsonFieldMapping[i]));
-			}
-
-			emitted++;
-			return returnRecord;
-		}
-
-		@Override
-		public RowData nextRecord(RowData record) throws IOException {
-			while (true) {
-				if (readLine()) {
-					RowData row = readRecord(record, this.currBuffer, this.currOffset, this.currLen);
-					if (row == null) {
-						continue;
-					} else {
-						return row;
-					}
-				} else {
-					this.end = true;
-					return null;
-				}
-			}
-		}
-	}
-
-	/**
-	 * A {@link JsonRowDataEncoder} is responsible to encode a {@link RowData} to {@link java.io.OutputStream}
-	 * with json format.
-	 */
-	public static class JsonRowDataEncoder implements Encoder<RowData> {
-
-		private static final long serialVersionUID = 1L;
-		private static final String DEFAULT_LINE_DELIMITER = "\n";
-		private final JsonRowDataSerializationSchema serializationSchema;
-
-		public JsonRowDataEncoder(JsonRowDataSerializationSchema serializationSchema) {
-			this.serializationSchema = serializationSchema;
-		}
-
-		@Override
-		public void encode(RowData element, OutputStream stream) throws IOException {
-			stream.write(serializationSchema.serialize(element));
-			stream.write(DEFAULT_LINE_DELIMITER.getBytes(StandardCharsets.UTF_8));
-		}
-	}
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 942b3dc..08f4657 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.formats.json.JsonFileSystemFormatFactory
 org.apache.flink.formats.json.JsonFormatFactory
 org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
 org.apache.flink.formats.json.canal.CanalJsonFormatFactory
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonBatchFileSystemITCase.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonBatchFileSystemITCase.java
index f1ee52b..7d17dbd 100644
--- a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonBatchFileSystemITCase.java
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonBatchFileSystemITCase.java
@@ -31,7 +31,7 @@
 import java.util.List;
 
 /**
- * ITCase to test json format for {@link JsonFileSystemFormatFactory}.
+ * ITCase to test json format for {@link JsonFormatFactory}.
  */
 public class JsonBatchFileSystemITCase extends BatchFileSystemITCaseBase {
 
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonFileCompactionITCase.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonFileCompactionITCase.java
new file mode 100644
index 0000000..dd1a8cc
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonFileCompactionITCase.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.table.planner.runtime.stream.sql.FileCompactionITCaseBase;
+
+/**
+ * Compaction it case for json.
+ */
+public class JsonFileCompactionITCase extends FileCompactionITCaseBase {
+
+	@Override
+	protected String format() {
+		return "json";
+	}
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFileSystemITCase.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFileSystemITCase.java
new file mode 100644
index 0000000..cf3e12c
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFileSystemITCase.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.debezium;
+
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+/**
+ * Test Filesystem connector with DebeziumJson.
+ */
+public class DebeziumJsonFileSystemITCase extends StreamingTestBase {
+
+	private static final List<String> EXPECTED = Arrays.asList(
+			"+I(101,SCOOTER,Small 2-wheel scooter,3.14)",
+			"+I(102,CAR BATTERY,12V car battery,8.1)",
+			"+I(103,12-PACK DRILL BITS,12-pack of drill bits with sizes ranging from #40 to #3,0.8)",
+			"+I(104,HAMMER,12oz carpenter's hammer,0.75)",
+			"+I(105,HAMMER,14oz carpenter's hammer,0.875)",
+			"+I(106,HAMMER,16oz carpenter's hammer,1.0)",
+			"+I(107,ROCKS,box of assorted rocks,5.3)",
+			"+I(108,JACKET,water resistent black wind breaker,0.1)",
+			"+I(109,SPARE TIRE,24 inch spare tire,22.2)",
+			"-D(106,HAMMER,16oz carpenter's hammer,1.0)", // -U
+			"+I(106,HAMMER,18oz carpenter hammer,1.0)", // +U
+			"-D(107,ROCKS,box of assorted rocks,5.3)", // -U
+			"+I(107,ROCKS,box of assorted rocks,5.1)", // +U
+			"+I(110,JACKET,water resistent white wind breaker,0.2)",
+			"+I(111,SCOOTER,Big 2-wheel scooter ,5.18)",
+			"-D(110,JACKET,water resistent white wind breaker,0.2)", // -U
+			"+I(110,JACKET,new water resistent white wind breaker,0.5)", // +U
+			"-D(111,SCOOTER,Big 2-wheel scooter ,5.18)", // -U
+			"+I(111,SCOOTER,Big 2-wheel scooter ,5.17)", // +U
+			"-D(111,SCOOTER,Big 2-wheel scooter ,5.17)"
+	);
+
+	private File source;
+	private File sink;
+
+	private void prepareTables(boolean isPartition) throws IOException {
+		byte[] bytes = readBytes("debezium-data-schema-exclude.txt");
+		source = TEMPORARY_FOLDER.newFolder();
+		File file;
+		if (isPartition) {
+			File partition = new File(source, "p=1");
+			partition.mkdirs();
+			file = new File(partition, "my_file");
+		} else {
+			file = new File(source, "my_file");
+		}
+		file.createNewFile();
+		Files.write(file.toPath(), bytes);
+
+		sink = TEMPORARY_FOLDER.newFolder();
+
+		env().setParallelism(1);
+	}
+
+	private void createTable(boolean isSink, String path, boolean isPartition) {
+		tEnv().executeSql(format("create table %s (", isSink ? "sink" : "source") +
+				"id int, name string," +
+				(isSink ? "upper_name string," : "") +
+				" description string, weight float" +
+				(isPartition ? ", p int) partitioned by (p) " : ")") +
+				" with (" +
+				"'connector'='filesystem'," +
+				"'format'='debezium-json'," +
+				format("'path'='%s'", path) +
+				")");
+	}
+
+	@Test
+	public void testNonPartition() throws Exception {
+		prepareTables(false);
+		createTable(false, source.toURI().toString(), false);
+		createTable(true, sink.toURI().toString(), false);
+
+		tEnv().executeSql("insert into sink select id,name,UPPER(name),description,weight from source").await();
+		CloseableIterator<Row> iter = tEnv()
+				.executeSql("select id,upper_name,description,weight from sink").collect();
+
+		List<String> results = CollectionUtil.iteratorToList(iter).stream()
+				.map(row -> row.getKind().shortString() + "(" + row.toString() + ")")
+				.collect(Collectors.toList());
+		iter.close();
+
+		Assert.assertEquals(EXPECTED, results);
+	}
+
+	@Test
+	public void testPartition() throws Exception {
+		prepareTables(true);
+		createTable(false, source.toURI().toString(), true);
+		createTable(true, sink.toURI().toString(), true);
+
+		tEnv().executeSql("insert into sink select id,name,UPPER(name),description,weight,p from source").await();
+		CloseableIterator<Row> iter = tEnv()
+				.executeSql("select id,upper_name,description,weight,p from sink").collect();
+		List<Row> list = CollectionUtil.iteratorToList(iter);
+		iter.close();
+
+		List<String> results = list.stream()
+				.map(row -> Row.project(row, new int[] {0, 1, 2, 3}))
+				.map(row -> row.getKind().shortString() + "(" + row.toString() + ")")
+				.collect(Collectors.toList());
+
+		Assert.assertEquals(EXPECTED, results);
+
+		// check partition value
+		for (Row row : list) {
+			Assert.assertEquals(1, row.getField(4));
+		}
+	}
+
+	private static byte[] readBytes(String resource) throws IOException {
+		final URL url = DebeziumJsonSerDeSchemaTest.class.getClassLoader().getResource(resource);
+		assert url != null;
+		Path path = new File(url.getFile()).toPath();
+		return Files.readAllBytes(path);
+	}
+}