[FLINK-19823][table][fs-connector] Filesystem connector supports de/serialization schema
This closes #13957
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java b/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java
deleted file mode 100644
index 831b8ca..0000000
--- a/flink-formats-kafka/flink-json-debezium/src/main/java/org/apache/flink/formats/json/JsonFileSystemFormatFactory.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.formats.json;
-
-import org.apache.flink.api.common.io.DelimitedInputFormat;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.serialization.BulkWriter;
-import org.apache.flink.api.common.serialization.Encoder;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.data.GenericRowData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.factories.FileSystemFormatFactory;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.utils.PartitionPathUtils;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.formats.json.JsonFormatFactory.validateFormatOptions;
-import static org.apache.flink.formats.json.JsonOptions.FAIL_ON_MISSING_FIELD;
-import static org.apache.flink.formats.json.JsonOptions.IGNORE_PARSE_ERRORS;
-import static org.apache.flink.formats.json.JsonOptions.TIMESTAMP_FORMAT;
-
-/**
- * Factory to build reader/writer to read/write json format file.
- */
-public class JsonFileSystemFormatFactory implements FileSystemFormatFactory {
-
- public static final String IDENTIFIER = "json";
-
- @Override
- public String factoryIdentifier() {
- return IDENTIFIER;
- }
-
- @Override
- public Set<ConfigOption<?>> requiredOptions() {
- return new HashSet<>();
- }
-
- @Override
- public Set<ConfigOption<?>> optionalOptions() {
- Set<ConfigOption<?>> options = new HashSet<>();
- options.add(FAIL_ON_MISSING_FIELD);
- options.add(IGNORE_PARSE_ERRORS);
- options.add(TIMESTAMP_FORMAT);
- return options;
- }
-
- @Override
- public InputFormat<RowData, ?> createReader(ReaderContext context) {
- ReadableConfig options = context.getFormatOptions();
- validateFormatOptions(options);
- boolean failOnMissingField = options.get(FAIL_ON_MISSING_FIELD);
- boolean ignoreParseErrors = options.get(IGNORE_PARSE_ERRORS);
- TimestampFormat timestampOption = JsonOptions.getTimestampFormat(options);
-
- RowType formatRowType = context.getFormatRowType();
- JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
- formatRowType,
- new GenericTypeInfo(GenericRowData.class),
- failOnMissingField,
- ignoreParseErrors,
- timestampOption);
-
- String[] fieldNames = context.getSchema().getFieldNames();
- List<String> projectFields = Arrays.stream(context.getProjectFields())
- .mapToObj(idx -> fieldNames[idx])
- .collect(Collectors.toList());
- List<String> jsonFields = Arrays.stream(fieldNames)
- .filter(field -> !context.getPartitionKeys().contains(field))
- .collect(Collectors.toList());
-
- int[] jsonSelectFieldToProjectFieldMapping = context.getFormatProjectFields().stream()
- .mapToInt(projectFields::indexOf)
- .toArray();
- int[] jsonSelectFieldToJsonFieldMapping = context.getFormatProjectFields().stream()
- .mapToInt(jsonFields::indexOf)
- .toArray();
-
- return new JsonInputFormat(
- context.getPaths(),
- context.getSchema().getFieldDataTypes(),
- context.getSchema().getFieldNames(),
- context.getProjectFields(),
- context.getPartitionKeys(),
- context.getDefaultPartName(),
- context.getPushedDownLimit(),
- jsonSelectFieldToProjectFieldMapping,
- jsonSelectFieldToJsonFieldMapping,
- deserializationSchema);
- }
-
- @Override
- public Optional<Encoder<RowData>> createEncoder(WriterContext context) {
- return Optional.of(new JsonRowDataEncoder(new JsonRowDataSerializationSchema(context.getFormatRowType(),
- JsonOptions.getTimestampFormat(context.getFormatOptions()))));
- }
-
- @Override
- public Optional<BulkWriter.Factory<RowData>> createBulkWriterFactory(WriterContext context) {
- return Optional.empty();
- }
-
- /**
- * A {@link JsonInputFormat} is responsible to read {@link RowData} records
- * from json format files.
- */
- public static class JsonInputFormat extends DelimitedInputFormat<RowData> {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * Code of \r, used to remove \r from a line when the line ends with \r\n.
- */
- private static final byte CARRIAGE_RETURN = (byte) '\r';
-
- /**
- * Code of \n, used to identify if \n is used as delimiter.
- */
- private static final byte NEW_LINE = (byte) '\n';
-
- private final DataType[] fieldTypes;
- private final String[] fieldNames;
- private final int[] selectFields;
- private final List<String> partitionKeys;
- private final String defaultPartValue;
- private final long limit;
- private final int[] jsonSelectFieldToProjectFieldMapping;
- private final int[] jsonSelectFieldToJsonFieldMapping;
- private final JsonRowDataDeserializationSchema deserializationSchema;
-
- private transient boolean end;
- private transient long emitted;
- // reuse object for per record
- private transient GenericRowData rowData;
-
- public JsonInputFormat(
- Path[] filePaths,
- DataType[] fieldTypes,
- String[] fieldNames,
- int[] selectFields,
- List<String> partitionKeys,
- String defaultPartValue,
- long limit,
- int[] jsonSelectFieldToProjectFieldMapping,
- int[] jsonSelectFieldToJsonFieldMapping,
- JsonRowDataDeserializationSchema deserializationSchema) {
- super.setFilePaths(filePaths);
- this.fieldTypes = fieldTypes;
- this.fieldNames = fieldNames;
- this.selectFields = selectFields;
- this.partitionKeys = partitionKeys;
- this.defaultPartValue = defaultPartValue;
- this.limit = limit;
- this.jsonSelectFieldToProjectFieldMapping = jsonSelectFieldToProjectFieldMapping;
- this.jsonSelectFieldToJsonFieldMapping = jsonSelectFieldToJsonFieldMapping;
- this.deserializationSchema = deserializationSchema;
- }
-
- @Override
- public boolean supportsMultiPaths() {
- return true;
- }
-
- @Override
- public void open(FileInputSplit split) throws IOException {
- super.open(split);
- this.end = false;
- this.emitted = 0L;
- this.rowData = PartitionPathUtils.fillPartitionValueForRecord(fieldNames, fieldTypes, selectFields,
- partitionKeys, currentSplit.getPath(), defaultPartValue);
- }
-
- @Override
- public boolean reachedEnd() {
- return emitted >= limit || end;
- }
-
- @Override
- public RowData readRecord(RowData reuse, byte[] bytes, int offset, int numBytes) throws IOException {
- // remove \r from a line when the line ends with \r\n
- if (this.getDelimiter() != null && this.getDelimiter().length == 1
- && this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= 1
- && bytes[offset + numBytes - 1] == CARRIAGE_RETURN) {
- numBytes -= 1;
- }
- byte[] trimBytes = Arrays.copyOfRange(bytes, offset, offset + numBytes);
- GenericRowData jsonRow = (GenericRowData) deserializationSchema.deserialize(trimBytes);
-
- if (jsonRow == null) {
- return null;
- }
-
- GenericRowData returnRecord = rowData;
- for (int i = 0; i < jsonSelectFieldToJsonFieldMapping.length; i++) {
- returnRecord.setField(jsonSelectFieldToProjectFieldMapping[i],
- jsonRow.getField(jsonSelectFieldToJsonFieldMapping[i]));
- }
-
- emitted++;
- return returnRecord;
- }
-
- @Override
- public RowData nextRecord(RowData record) throws IOException {
- while (true) {
- if (readLine()) {
- RowData row = readRecord(record, this.currBuffer, this.currOffset, this.currLen);
- if (row == null) {
- continue;
- } else {
- return row;
- }
- } else {
- this.end = true;
- return null;
- }
- }
- }
- }
-
- /**
- * A {@link JsonRowDataEncoder} is responsible to encode a {@link RowData} to {@link java.io.OutputStream}
- * with json format.
- */
- public static class JsonRowDataEncoder implements Encoder<RowData> {
-
- private static final long serialVersionUID = 1L;
- private static final String DEFAULT_LINE_DELIMITER = "\n";
- private final JsonRowDataSerializationSchema serializationSchema;
-
- public JsonRowDataEncoder(JsonRowDataSerializationSchema serializationSchema) {
- this.serializationSchema = serializationSchema;
- }
-
- @Override
- public void encode(RowData element, OutputStream stream) throws IOException {
- stream.write(serializationSchema.serialize(element));
- stream.write(DEFAULT_LINE_DELIMITER.getBytes(StandardCharsets.UTF_8));
- }
- }
-}
diff --git a/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 942b3dc..08f4657 100644
--- a/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-formats-kafka/flink-json-debezium/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.formats.json.JsonFileSystemFormatFactory
org.apache.flink.formats.json.JsonFormatFactory
org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
org.apache.flink.formats.json.canal.CanalJsonFormatFactory
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonBatchFileSystemITCase.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonBatchFileSystemITCase.java
index f1ee52b..7d17dbd 100644
--- a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonBatchFileSystemITCase.java
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonBatchFileSystemITCase.java
@@ -31,7 +31,7 @@
import java.util.List;
/**
- * ITCase to test json format for {@link JsonFileSystemFormatFactory}.
+ * ITCase to test json format for {@link JsonFormatFactory}.
*/
public class JsonBatchFileSystemITCase extends BatchFileSystemITCaseBase {
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonFileCompactionITCase.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonFileCompactionITCase.java
new file mode 100644
index 0000000..dd1a8cc
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/JsonFileCompactionITCase.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json;
+
+import org.apache.flink.table.planner.runtime.stream.sql.FileCompactionITCaseBase;
+
+/**
+ * Compaction it case for json.
+ */
+public class JsonFileCompactionITCase extends FileCompactionITCaseBase {
+
+ @Override
+ protected String format() {
+ return "json";
+ }
+}
diff --git a/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFileSystemITCase.java b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFileSystemITCase.java
new file mode 100644
index 0000000..cf3e12c
--- /dev/null
+++ b/flink-formats-kafka/flink-json-debezium/src/test/java/org/apache/flink/formats/json/debezium/DebeziumJsonFileSystemITCase.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.debezium;
+
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+
+/**
+ * Test Filesystem connector with DebeziumJson.
+ */
+public class DebeziumJsonFileSystemITCase extends StreamingTestBase {
+
+ private static final List<String> EXPECTED = Arrays.asList(
+ "+I(101,SCOOTER,Small 2-wheel scooter,3.14)",
+ "+I(102,CAR BATTERY,12V car battery,8.1)",
+ "+I(103,12-PACK DRILL BITS,12-pack of drill bits with sizes ranging from #40 to #3,0.8)",
+ "+I(104,HAMMER,12oz carpenter's hammer,0.75)",
+ "+I(105,HAMMER,14oz carpenter's hammer,0.875)",
+ "+I(106,HAMMER,16oz carpenter's hammer,1.0)",
+ "+I(107,ROCKS,box of assorted rocks,5.3)",
+ "+I(108,JACKET,water resistent black wind breaker,0.1)",
+ "+I(109,SPARE TIRE,24 inch spare tire,22.2)",
+ "-D(106,HAMMER,16oz carpenter's hammer,1.0)", // -U
+ "+I(106,HAMMER,18oz carpenter hammer,1.0)", // +U
+ "-D(107,ROCKS,box of assorted rocks,5.3)", // -U
+ "+I(107,ROCKS,box of assorted rocks,5.1)", // +U
+ "+I(110,JACKET,water resistent white wind breaker,0.2)",
+ "+I(111,SCOOTER,Big 2-wheel scooter ,5.18)",
+ "-D(110,JACKET,water resistent white wind breaker,0.2)", // -U
+ "+I(110,JACKET,new water resistent white wind breaker,0.5)", // +U
+ "-D(111,SCOOTER,Big 2-wheel scooter ,5.18)", // -U
+ "+I(111,SCOOTER,Big 2-wheel scooter ,5.17)", // +U
+ "-D(111,SCOOTER,Big 2-wheel scooter ,5.17)"
+ );
+
+ private File source;
+ private File sink;
+
+ private void prepareTables(boolean isPartition) throws IOException {
+ byte[] bytes = readBytes("debezium-data-schema-exclude.txt");
+ source = TEMPORARY_FOLDER.newFolder();
+ File file;
+ if (isPartition) {
+ File partition = new File(source, "p=1");
+ partition.mkdirs();
+ file = new File(partition, "my_file");
+ } else {
+ file = new File(source, "my_file");
+ }
+ file.createNewFile();
+ Files.write(file.toPath(), bytes);
+
+ sink = TEMPORARY_FOLDER.newFolder();
+
+ env().setParallelism(1);
+ }
+
+ private void createTable(boolean isSink, String path, boolean isPartition) {
+ tEnv().executeSql(format("create table %s (", isSink ? "sink" : "source") +
+ "id int, name string," +
+ (isSink ? "upper_name string," : "") +
+ " description string, weight float" +
+ (isPartition ? ", p int) partitioned by (p) " : ")") +
+ " with (" +
+ "'connector'='filesystem'," +
+ "'format'='debezium-json'," +
+ format("'path'='%s'", path) +
+ ")");
+ }
+
+ @Test
+ public void testNonPartition() throws Exception {
+ prepareTables(false);
+ createTable(false, source.toURI().toString(), false);
+ createTable(true, sink.toURI().toString(), false);
+
+ tEnv().executeSql("insert into sink select id,name,UPPER(name),description,weight from source").await();
+ CloseableIterator<Row> iter = tEnv()
+ .executeSql("select id,upper_name,description,weight from sink").collect();
+
+ List<String> results = CollectionUtil.iteratorToList(iter).stream()
+ .map(row -> row.getKind().shortString() + "(" + row.toString() + ")")
+ .collect(Collectors.toList());
+ iter.close();
+
+ Assert.assertEquals(EXPECTED, results);
+ }
+
+ @Test
+ public void testPartition() throws Exception {
+ prepareTables(true);
+ createTable(false, source.toURI().toString(), true);
+ createTable(true, sink.toURI().toString(), true);
+
+ tEnv().executeSql("insert into sink select id,name,UPPER(name),description,weight,p from source").await();
+ CloseableIterator<Row> iter = tEnv()
+ .executeSql("select id,upper_name,description,weight,p from sink").collect();
+ List<Row> list = CollectionUtil.iteratorToList(iter);
+ iter.close();
+
+ List<String> results = list.stream()
+ .map(row -> Row.project(row, new int[] {0, 1, 2, 3}))
+ .map(row -> row.getKind().shortString() + "(" + row.toString() + ")")
+ .collect(Collectors.toList());
+
+ Assert.assertEquals(EXPECTED, results);
+
+ // check partition value
+ for (Row row : list) {
+ Assert.assertEquals(1, row.getField(4));
+ }
+ }
+
+ private static byte[] readBytes(String resource) throws IOException {
+ final URL url = DebeziumJsonSerDeSchemaTest.class.getClassLoader().getResource(resource);
+ assert url != null;
+ Path path = new File(url.getFile()).toPath();
+ return Files.readAllBytes(path);
+ }
+}