[Improve][Connector-V2] Improve read parquet (#2841)

* [Improve][Connector-V2] Improve read parquet & add parquet test

* [Improve][Connector-V2] Add shade config in file connectors

* [improve][build] improve maven-shade-plugin relocation for connector-file

Co-authored-by: Zongwen Li <zongwen.li.tech@gmail.com>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
index dce7647..f2e425e 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
@@ -35,7 +35,7 @@
         <commons.collecton4.version>4.4</commons.collecton4.version>
         <commons.lang3.version>3.4</commons.lang3.version>
         <flink.hadoop.version>2.7.5-7.0</flink.hadoop.version>
-        <parquet-avro.version>1.10.0</parquet-avro.version>
+        <parquet-avro.version>1.12.3</parquet-avro.version>
     </properties>
     
     <dependencyManagement>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 789ecb7..da463c4 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 
+import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
+
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
@@ -50,6 +52,7 @@
     @Override
     public Configuration getConfiguration(HadoopConf hadoopConf) {
         Configuration configuration = new Configuration();
+        configuration.set(READ_INT96_AS_FIXED, "true");
         if (hadoopConf != null) {
             configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
             configuration.set("fs.hdfs.impl", hadoopConf.getFsHdfsImpl());
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index 82e05be..241b6f4 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -18,17 +18,20 @@
 package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
@@ -36,21 +39,30 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.example.data.simple.NanoTime;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.hadoop.util.HadoopInputFile;
 import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
 
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -59,8 +71,10 @@
     private SeaTunnelRowType seaTunnelRowType;
 
     private static final byte[] PARQUET_MAGIC = new byte[]{(byte) 'P', (byte) 'A', (byte) 'R', (byte) '1'};
+    private static final long NANOS_PER_MILLISECOND = 1000000;
+    private static final long MILLIS_PER_DAY = TimeUnit.DAYS.toMillis(1L);
+    private static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
 
-    @SuppressWarnings("unchecked")
     @Override
     public void read(String path, Collector<SeaTunnelRow> output) throws Exception {
         if (Boolean.FALSE.equals(checkFileType(path))) {
@@ -75,35 +89,105 @@
                 Object[] fields = new Object[fieldsCount];
                 for (int i = 0; i < fieldsCount; i++) {
                     Object data = record.get(i);
-                    try {
-                        if (data instanceof GenericData.Fixed) {
-                            // judge the data in upstream is or not decimal type
-                            data = fixed2String((GenericData.Fixed) data);
-                        } else if (data instanceof ArrayList) {
-                            // judge the data in upstream is or not array type
-                            data = array2String((ArrayList<GenericData.Record>) data);
-                        }
-                    } catch (Exception e) {
-                        data = record.get(i);
-                    } finally {
-                        fields[i] = data.toString();
-                    }
+                    fields[i] = resolveObject(data, seaTunnelRowType.getFieldType(i));
                 }
                 output.collect(new SeaTunnelRow(fields));
             }
         }
     }
 
+    private Object resolveObject(Object field, SeaTunnelDataType<?> fieldType) {
+        switch (fieldType.getSqlType()) {
+            case ARRAY:
+                List<Object> origArray = ((ArrayList<Object>) field).stream().map(item -> ((GenericData.Record) item).get("array_element")).collect(Collectors.toList());
+                SeaTunnelDataType<?> elementType = ((ArrayType<?, ?>) fieldType).getElementType();
+                switch (elementType.getSqlType()) {
+                    case STRING:
+                        return origArray.toArray(new String[0]);
+                    case BOOLEAN:
+                        return origArray.toArray(new Boolean[0]);
+                    case TINYINT:
+                        return origArray.toArray(new Byte[0]);
+                    case SMALLINT:
+                        return origArray.toArray(new Short[0]);
+                    case INT:
+                        return origArray.toArray(new Integer[0]);
+                    case BIGINT:
+                        return origArray.toArray(new Long[0]);
+                    case FLOAT:
+                        return origArray.toArray(new Float[0]);
+                    case DOUBLE:
+                        return origArray.toArray(new Double[0]);
+                    default:
+                        String errorMsg = String.format("SeaTunnel array type not support this type [%s] now", fieldType.getSqlType());
+                        throw new UnsupportedOperationException(errorMsg);
+                }
+            case MAP:
+                HashMap<Object, Object> dataMap = new HashMap<>();
+                SeaTunnelDataType<?> keyType = ((MapType<?, ?>) fieldType).getKeyType();
+                SeaTunnelDataType<?> valueType = ((MapType<?, ?>) fieldType).getValueType();
+                HashMap<Object, Object> origDataMap = (HashMap<Object, Object>) field;
+                origDataMap.forEach((key, value) -> dataMap.put(resolveObject(key, keyType), resolveObject(value, valueType)));
+                return dataMap;
+            case BOOLEAN:
+            case INT:
+            case BIGINT:
+            case FLOAT:
+            case DOUBLE:
+                return field;
+            case STRING:
+                return field.toString();
+            case TINYINT:
+                return Byte.parseByte(field.toString());
+            case SMALLINT:
+                return Short.parseShort(field.toString());
+            case DECIMAL:
+                int precision = ((DecimalType) fieldType).getPrecision();
+                int scale = ((DecimalType) fieldType).getScale();
+                return bytes2Decimal(((GenericData.Fixed) field).bytes(), precision, scale);
+            case NULL:
+                return null;
+            case BYTES:
+                ByteBuffer buffer = (ByteBuffer) field;
+                byte[] bytes = new byte[buffer.remaining()];
+                buffer.get(bytes, 0, bytes.length);
+                return bytes;
+            case DATE:
+                return LocalDate.ofEpochDay(Long.parseLong(field.toString()));
+            case TIMESTAMP:
+                Binary binary = Binary.fromConstantByteArray(((GenericData.Fixed) field).bytes());
+                NanoTime nanoTime = NanoTime.fromBinary(binary);
+                int julianDay = nanoTime.getJulianDay();
+                long nanosOfDay = nanoTime.getTimeOfDayNanos();
+                long timestamp = (julianDay - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * MILLIS_PER_DAY + nanosOfDay / NANOS_PER_MILLISECOND;
+                return new Timestamp(timestamp).toLocalDateTime();
+            case ROW:
+                SeaTunnelRowType rowType = (SeaTunnelRowType) fieldType;
+                Object[] objects = new Object[rowType.getTotalFields()];
+                for (int i = 0; i < rowType.getTotalFields(); i++) {
+                    SeaTunnelDataType<?> dataType = rowType.getFieldType(i);
+                    objects[i] = resolveObject(((GenericRecord) field).get(i), dataType);
+                }
+                return new SeaTunnelRow(objects);
+            default:
+                // do nothing
+                // never got in there
+                throw new UnsupportedOperationException("SeaTunnel not support this data type now");
+        }
+    }
+
     @Override
     public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FilePluginException {
         if (seaTunnelRowType != null) {
             return seaTunnelRowType;
         }
-        Configuration configuration = getConfiguration(hadoopConf);
         Path filePath = new Path(path);
         ParquetMetadata metadata;
         try {
-            metadata = ParquetFileReader.readFooter(configuration, filePath);
+            HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(filePath, getConfiguration());
+            ParquetFileReader reader = ParquetFileReader.open(hadoopInputFile);
+            metadata = reader.getFooter();
+            reader.close();
         } catch (IOException e) {
             throw new FilePluginException("Create parquet reader failed", e);
         }
@@ -114,21 +198,111 @@
         SeaTunnelDataType<?>[] types = new SeaTunnelDataType[fieldCount];
         for (int i = 0; i < fieldCount; i++) {
             fields[i] = schema.getFieldName(i);
-            // Temporarily each field is treated as a string type
-            // I think we can use the schema information to build seatunnel column type
-            types[i] = BasicType.STRING_TYPE;
+            Type type = schema.getType(i);
+            SeaTunnelDataType<?> fieldType = parquetType2SeaTunnelType(type);
+            types[i] = fieldType;
         }
         seaTunnelRowType = new SeaTunnelRowType(fields, types);
         return seaTunnelRowType;
     }
 
-    private String fixed2String(GenericData.Fixed fixed) {
-        Schema schema = fixed.getSchema();
-        byte[] bytes = fixed.bytes();
-        int precision = Integer.parseInt(schema.getObjectProps().get("precision").toString());
-        int scale = Integer.parseInt(schema.getObjectProps().get("scale").toString());
-        BigDecimal bigDecimal = bytes2Decimal(bytes, precision, scale);
-        return bigDecimal.toString();
+    private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type) {
+        if (type.isPrimitive()) {
+            switch (type.asPrimitiveType().getPrimitiveTypeName()) {
+                case INT32:
+                    OriginalType originalType = type.asPrimitiveType().getOriginalType();
+                    if (originalType == null) {
+                        return BasicType.INT_TYPE;
+                    }
+                    switch (type.asPrimitiveType().getOriginalType()) {
+                        case INT_8:
+                            return BasicType.BYTE_TYPE;
+                        case INT_16:
+                            return BasicType.SHORT_TYPE;
+                        case DATE:
+                            return LocalTimeType.LOCAL_DATE_TYPE;
+                        default:
+                            String errorMsg = String.format("Not support this type [%s]", type);
+                            throw new UnsupportedOperationException(errorMsg);
+                    }
+                case INT64:
+                    return BasicType.LONG_TYPE;
+                case INT96:
+                    return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+                case BINARY:
+                    if (type.asPrimitiveType().getOriginalType() == null) {
+                        return PrimitiveByteArrayType.INSTANCE;
+                    }
+                    return BasicType.STRING_TYPE;
+                case FLOAT:
+                    return BasicType.FLOAT_TYPE;
+                case DOUBLE:
+                    return BasicType.DOUBLE_TYPE;
+                case BOOLEAN:
+                    return BasicType.BOOLEAN_TYPE;
+                case FIXED_LEN_BYTE_ARRAY:
+                    String typeInfo = type.getLogicalTypeAnnotation().toString()
+                            .replaceAll(SqlType.DECIMAL.toString(), "")
+                            .replaceAll("\\(", "")
+                            .replaceAll("\\)", "");
+                    String[] splits = typeInfo.split(",");
+                    int precision = Integer.parseInt(splits[0]);
+                    int scale = Integer.parseInt(splits[1]);
+                    return new DecimalType(precision, scale);
+                default:
+                    String errorMsg = String.format("Not support this type [%s]", type);
+                    throw new UnsupportedOperationException(errorMsg);
+            }
+        } else {
+            LogicalTypeAnnotation logicalTypeAnnotation = type.asGroupType().getLogicalTypeAnnotation();
+            if (logicalTypeAnnotation == null) {
+                // struct type
+                List<Type> fields = type.asGroupType().getFields();
+                String[] fieldNames = new String[fields.size()];
+                SeaTunnelDataType<?>[] seaTunnelDataTypes = new SeaTunnelDataType<?>[fields.size()];
+                for (int i = 0; i < fields.size(); i++) {
+                    Type fieldType = fields.get(i);
+                    SeaTunnelDataType<?> seaTunnelDataType = parquetType2SeaTunnelType(fields.get(i));
+                    fieldNames[i] = fieldType.getName();
+                    seaTunnelDataTypes[i] = seaTunnelDataType;
+                }
+                return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
+            } else {
+                switch (logicalTypeAnnotation.toOriginalType()) {
+                    case MAP:
+                        GroupType groupType = type.asGroupType().getType(0).asGroupType();
+                        SeaTunnelDataType<?> keyType = parquetType2SeaTunnelType(groupType.getType(0));
+                        SeaTunnelDataType<?> valueType = parquetType2SeaTunnelType(groupType.getType(1));
+                        return new MapType<>(keyType, valueType);
+                    case LIST:
+                        Type elementType = type.asGroupType().getType(0).asGroupType().getType(0);
+                        SeaTunnelDataType<?> fieldType = parquetType2SeaTunnelType(elementType);
+                        switch (fieldType.getSqlType()) {
+                            case STRING:
+                                return ArrayType.STRING_ARRAY_TYPE;
+                            case BOOLEAN:
+                                return ArrayType.BOOLEAN_ARRAY_TYPE;
+                            case TINYINT:
+                                return ArrayType.BYTE_ARRAY_TYPE;
+                            case SMALLINT:
+                                return ArrayType.SHORT_ARRAY_TYPE;
+                            case INT:
+                                return ArrayType.INT_ARRAY_TYPE;
+                            case BIGINT:
+                                return ArrayType.LONG_ARRAY_TYPE;
+                            case FLOAT:
+                                return ArrayType.FLOAT_ARRAY_TYPE;
+                            case DOUBLE:
+                                return ArrayType.DOUBLE_ARRAY_TYPE;
+                            default:
+                                String errorMsg = String.format("SeaTunnel array type not supported this genericType [%s] yet", fieldType);
+                                throw new UnsupportedOperationException(errorMsg);
+                        }
+                    default:
+                        throw new UnsupportedOperationException("SeaTunnel file connector not support this nest type");
+                }
+            }
+        }
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
@@ -177,10 +351,4 @@
             throw new RuntimeException(errorMsg, e);
         }
     }
-
-    private String array2String(ArrayList<GenericData.Record> data) throws JsonProcessingException {
-        ObjectMapper objectMapper = new ObjectMapper();
-        List<String> values = data.stream().map(record -> record.get(0).toString()).collect(Collectors.toList());
-        return objectMapper.writeValueAsString(values);
-    }
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
new file mode 100644
index 0000000..c86d54e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.writer;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.URL;
+import java.nio.file.Paths;
+
+public class ParquetReadStrategyTest {
+    @Test
+    public void testParquetRead() throws Exception {
+        URL resource = ParquetReadStrategyTest.class.getResource("/test.parquet");
+        assert resource != null;
+        String path = Paths.get(resource.toURI()).toString();
+        ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
+        parquetReadStrategy.init(null);
+        SeaTunnelRowType seaTunnelRowTypeInfo = parquetReadStrategy.getSeaTunnelRowTypeInfo(null, path);
+        assert seaTunnelRowTypeInfo != null;
+        TestCollector testCollector = new TestCollector();
+        parquetReadStrategy.read(path, testCollector);
+    }
+
+    public static class TestCollector implements Collector<SeaTunnelRow> {
+
+        @SuppressWarnings("checkstyle:RegexpSingleline")
+        @Override
+        public void collect(SeaTunnelRow record) {
+            System.out.println(record);
+        }
+
+        @Override
+        public Object getCheckpointLock() {
+            return null;
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test.parquet b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test.parquet
new file mode 100644
index 0000000..123ef52
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test.parquet
Binary files differ
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
index 6241a81..94bd660 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
@@ -29,6 +29,10 @@
     <modelVersion>4.0.0</modelVersion>
     <artifactId>connector-file-ftp</artifactId>
 
+    <properties>
+        <connector.name>file.ftp</connector.name>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml
index 330d7c6..dc59fe9 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml
@@ -29,6 +29,10 @@
 
     <artifactId>connector-file-hadoop</artifactId>
 
+    <properties>
+        <connector.name>file.hadoop</connector.name>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
@@ -40,4 +44,5 @@
             <artifactId>flink-shaded-hadoop-2</artifactId>
         </dependency>
     </dependencies>
+
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
index 58b9d01..93f000f 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
@@ -29,6 +29,10 @@
 
     <artifactId>connector-file-local</artifactId>
 
+    <properties>
+        <connector.name>file.local</connector.name>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
@@ -40,4 +44,5 @@
             <artifactId>flink-shaded-hadoop-2</artifactId>
         </dependency>
     </dependencies>
+
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-oss/pom.xml
index 0a0ae91..4e2ca4d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/pom.xml
@@ -30,6 +30,7 @@
     <artifactId>connector-file-oss</artifactId>
     <properties>
         <hadoop-aliyun.version>2.9.2</hadoop-aliyun.version>
+        <connector.name>file.oss</connector.name>
     </properties>
 
     <dependencies>
diff --git a/seatunnel-connectors-v2/connector-file/pom.xml b/seatunnel-connectors-v2/connector-file/pom.xml
index 00a3cc1..f71d739 100644
--- a/seatunnel-connectors-v2/connector-file/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/pom.xml
@@ -36,4 +36,46 @@
         <module>connector-file-oss</module>
         <module>connector-file-ftp</module>
     </modules>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.apache.avro</pattern>
+                                    <!--suppress UnresolvedMavenProperty, this property is added by submodule-->
+                                    <shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.avro</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.parquet</pattern>
+                                    <!--suppress UnresolvedMavenProperty -->
+                                    <shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.parquet</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>shaded.parquet</pattern>
+                                    <!--suppress UnresolvedMavenProperty -->
+                                    <shadedPattern>${seatunnel.shade.package}.${connector.name}.shaded.parquet</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <!-- make sure that flatten runs after maven-shade-plugin -->
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>flatten-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
 </project>
\ No newline at end of file