[Improve][Connector-V2] Improve read parquet (#2841)
* [Improve][Connector-V2] Improve read parquet & add parquet test
* [Improve][Connector-V2] Add shade config in file connectors
* [improve][build] improve maven-shade-plugin relocation for connector-file
Co-authored-by: Zongwen Li <zongwen.li.tech@gmail.com>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
index dce7647..f2e425e 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
@@ -35,7 +35,7 @@
<commons.collecton4.version>4.4</commons.collecton4.version>
<commons.lang3.version>3.4</commons.lang3.version>
<flink.hadoop.version>2.7.5-7.0</flink.hadoop.version>
- <parquet-avro.version>1.10.0</parquet-avro.version>
+ <parquet-avro.version>1.12.3</parquet-avro.version>
</properties>
<dependencyManagement>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 789ecb7..da463c4 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
@@ -50,6 +52,7 @@
@Override
public Configuration getConfiguration(HadoopConf hadoopConf) {
Configuration configuration = new Configuration();
+ configuration.set(READ_INT96_AS_FIXED, "true");
if (hadoopConf != null) {
configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
configuration.set("fs.hdfs.impl", hadoopConf.getFsHdfsImpl());
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index 82e05be..241b6f4 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -18,17 +18,20 @@
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
@@ -36,21 +39,30 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.example.data.simple.NanoTime;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.Type;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Slf4j
@@ -59,8 +71,10 @@
private SeaTunnelRowType seaTunnelRowType;
private static final byte[] PARQUET_MAGIC = new byte[]{(byte) 'P', (byte) 'A', (byte) 'R', (byte) '1'};
+ private static final long NANOS_PER_MILLISECOND = 1000000;
+ private static final long MILLIS_PER_DAY = TimeUnit.DAYS.toMillis(1L);
+ private static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
- @SuppressWarnings("unchecked")
@Override
public void read(String path, Collector<SeaTunnelRow> output) throws Exception {
if (Boolean.FALSE.equals(checkFileType(path))) {
@@ -75,35 +89,105 @@
Object[] fields = new Object[fieldsCount];
for (int i = 0; i < fieldsCount; i++) {
Object data = record.get(i);
- try {
- if (data instanceof GenericData.Fixed) {
- // judge the data in upstream is or not decimal type
- data = fixed2String((GenericData.Fixed) data);
- } else if (data instanceof ArrayList) {
- // judge the data in upstream is or not array type
- data = array2String((ArrayList<GenericData.Record>) data);
- }
- } catch (Exception e) {
- data = record.get(i);
- } finally {
- fields[i] = data.toString();
- }
+ fields[i] = resolveObject(data, seaTunnelRowType.getFieldType(i));
}
output.collect(new SeaTunnelRow(fields));
}
}
}
+ private Object resolveObject(Object field, SeaTunnelDataType<?> fieldType) {
+ switch (fieldType.getSqlType()) {
+ case ARRAY:
+ List<Object> origArray = ((ArrayList<Object>) field).stream().map(item -> ((GenericData.Record) item).get("array_element")).collect(Collectors.toList());
+ SeaTunnelDataType<?> elementType = ((ArrayType<?, ?>) fieldType).getElementType();
+ switch (elementType.getSqlType()) {
+ case STRING:
+ return origArray.toArray(new String[0]);
+ case BOOLEAN:
+ return origArray.toArray(new Boolean[0]);
+ case TINYINT:
+ return origArray.toArray(new Byte[0]);
+ case SMALLINT:
+ return origArray.toArray(new Short[0]);
+ case INT:
+ return origArray.toArray(new Integer[0]);
+ case BIGINT:
+ return origArray.toArray(new Long[0]);
+ case FLOAT:
+ return origArray.toArray(new Float[0]);
+ case DOUBLE:
+ return origArray.toArray(new Double[0]);
+ default:
+ String errorMsg = String.format("SeaTunnel array type not support this type [%s] now", fieldType.getSqlType());
+ throw new UnsupportedOperationException(errorMsg);
+ }
+ case MAP:
+ HashMap<Object, Object> dataMap = new HashMap<>();
+ SeaTunnelDataType<?> keyType = ((MapType<?, ?>) fieldType).getKeyType();
+ SeaTunnelDataType<?> valueType = ((MapType<?, ?>) fieldType).getValueType();
+ HashMap<Object, Object> origDataMap = (HashMap<Object, Object>) field;
+ origDataMap.forEach((key, value) -> dataMap.put(resolveObject(key, keyType), resolveObject(value, valueType)));
+ return dataMap;
+ case BOOLEAN:
+ case INT:
+ case BIGINT:
+ case FLOAT:
+ case DOUBLE:
+ return field;
+ case STRING:
+ return field.toString();
+ case TINYINT:
+ return Byte.parseByte(field.toString());
+ case SMALLINT:
+ return Short.parseShort(field.toString());
+ case DECIMAL:
+ int precision = ((DecimalType) fieldType).getPrecision();
+ int scale = ((DecimalType) fieldType).getScale();
+ return bytes2Decimal(((GenericData.Fixed) field).bytes(), precision, scale);
+ case NULL:
+ return null;
+ case BYTES:
+ ByteBuffer buffer = (ByteBuffer) field;
+ byte[] bytes = new byte[buffer.remaining()];
+ buffer.get(bytes, 0, bytes.length);
+ return bytes;
+ case DATE:
+ return LocalDate.ofEpochDay(Long.parseLong(field.toString()));
+ case TIMESTAMP:
+ Binary binary = Binary.fromConstantByteArray(((GenericData.Fixed) field).bytes());
+ NanoTime nanoTime = NanoTime.fromBinary(binary);
+ int julianDay = nanoTime.getJulianDay();
+ long nanosOfDay = nanoTime.getTimeOfDayNanos();
+ long timestamp = (julianDay - JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH) * MILLIS_PER_DAY + nanosOfDay / NANOS_PER_MILLISECOND;
+ return new Timestamp(timestamp).toLocalDateTime();
+ case ROW:
+ SeaTunnelRowType rowType = (SeaTunnelRowType) fieldType;
+ Object[] objects = new Object[rowType.getTotalFields()];
+ for (int i = 0; i < rowType.getTotalFields(); i++) {
+ SeaTunnelDataType<?> dataType = rowType.getFieldType(i);
+ objects[i] = resolveObject(((GenericRecord) field).get(i), dataType);
+ }
+ return new SeaTunnelRow(objects);
+ default:
+ // do nothing
+ // never got in there
+ throw new UnsupportedOperationException("SeaTunnel not support this data type now");
+ }
+ }
+
@Override
public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FilePluginException {
if (seaTunnelRowType != null) {
return seaTunnelRowType;
}
- Configuration configuration = getConfiguration(hadoopConf);
Path filePath = new Path(path);
ParquetMetadata metadata;
try {
- metadata = ParquetFileReader.readFooter(configuration, filePath);
+ HadoopInputFile hadoopInputFile = HadoopInputFile.fromPath(filePath, getConfiguration());
+ ParquetFileReader reader = ParquetFileReader.open(hadoopInputFile);
+ metadata = reader.getFooter();
+ reader.close();
} catch (IOException e) {
throw new FilePluginException("Create parquet reader failed", e);
}
@@ -114,21 +198,111 @@
SeaTunnelDataType<?>[] types = new SeaTunnelDataType[fieldCount];
for (int i = 0; i < fieldCount; i++) {
fields[i] = schema.getFieldName(i);
- // Temporarily each field is treated as a string type
- // I think we can use the schema information to build seatunnel column type
- types[i] = BasicType.STRING_TYPE;
+ Type type = schema.getType(i);
+ SeaTunnelDataType<?> fieldType = parquetType2SeaTunnelType(type);
+ types[i] = fieldType;
}
seaTunnelRowType = new SeaTunnelRowType(fields, types);
return seaTunnelRowType;
}
- private String fixed2String(GenericData.Fixed fixed) {
- Schema schema = fixed.getSchema();
- byte[] bytes = fixed.bytes();
- int precision = Integer.parseInt(schema.getObjectProps().get("precision").toString());
- int scale = Integer.parseInt(schema.getObjectProps().get("scale").toString());
- BigDecimal bigDecimal = bytes2Decimal(bytes, precision, scale);
- return bigDecimal.toString();
+ private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type) {
+ if (type.isPrimitive()) {
+ switch (type.asPrimitiveType().getPrimitiveTypeName()) {
+ case INT32:
+ OriginalType originalType = type.asPrimitiveType().getOriginalType();
+ if (originalType == null) {
+ return BasicType.INT_TYPE;
+ }
+ switch (type.asPrimitiveType().getOriginalType()) {
+ case INT_8:
+ return BasicType.BYTE_TYPE;
+ case INT_16:
+ return BasicType.SHORT_TYPE;
+ case DATE:
+ return LocalTimeType.LOCAL_DATE_TYPE;
+ default:
+ String errorMsg = String.format("Not support this type [%s]", type);
+ throw new UnsupportedOperationException(errorMsg);
+ }
+ case INT64:
+ return BasicType.LONG_TYPE;
+ case INT96:
+ return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ case BINARY:
+ if (type.asPrimitiveType().getOriginalType() == null) {
+ return PrimitiveByteArrayType.INSTANCE;
+ }
+ return BasicType.STRING_TYPE;
+ case FLOAT:
+ return BasicType.FLOAT_TYPE;
+ case DOUBLE:
+ return BasicType.DOUBLE_TYPE;
+ case BOOLEAN:
+ return BasicType.BOOLEAN_TYPE;
+ case FIXED_LEN_BYTE_ARRAY:
+ String typeInfo = type.getLogicalTypeAnnotation().toString()
+ .replaceAll(SqlType.DECIMAL.toString(), "")
+ .replaceAll("\\(", "")
+ .replaceAll("\\)", "");
+ String[] splits = typeInfo.split(",");
+ int precision = Integer.parseInt(splits[0]);
+ int scale = Integer.parseInt(splits[1]);
+ return new DecimalType(precision, scale);
+ default:
+ String errorMsg = String.format("Not support this type [%s]", type);
+ throw new UnsupportedOperationException(errorMsg);
+ }
+ } else {
+ LogicalTypeAnnotation logicalTypeAnnotation = type.asGroupType().getLogicalTypeAnnotation();
+ if (logicalTypeAnnotation == null) {
+ // struct type
+ List<Type> fields = type.asGroupType().getFields();
+ String[] fieldNames = new String[fields.size()];
+ SeaTunnelDataType<?>[] seaTunnelDataTypes = new SeaTunnelDataType<?>[fields.size()];
+ for (int i = 0; i < fields.size(); i++) {
+ Type fieldType = fields.get(i);
+ SeaTunnelDataType<?> seaTunnelDataType = parquetType2SeaTunnelType(fields.get(i));
+ fieldNames[i] = fieldType.getName();
+ seaTunnelDataTypes[i] = seaTunnelDataType;
+ }
+ return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
+ } else {
+ switch (logicalTypeAnnotation.toOriginalType()) {
+ case MAP:
+ GroupType groupType = type.asGroupType().getType(0).asGroupType();
+ SeaTunnelDataType<?> keyType = parquetType2SeaTunnelType(groupType.getType(0));
+ SeaTunnelDataType<?> valueType = parquetType2SeaTunnelType(groupType.getType(1));
+ return new MapType<>(keyType, valueType);
+ case LIST:
+ Type elementType = type.asGroupType().getType(0).asGroupType().getType(0);
+ SeaTunnelDataType<?> fieldType = parquetType2SeaTunnelType(elementType);
+ switch (fieldType.getSqlType()) {
+ case STRING:
+ return ArrayType.STRING_ARRAY_TYPE;
+ case BOOLEAN:
+ return ArrayType.BOOLEAN_ARRAY_TYPE;
+ case TINYINT:
+ return ArrayType.BYTE_ARRAY_TYPE;
+ case SMALLINT:
+ return ArrayType.SHORT_ARRAY_TYPE;
+ case INT:
+ return ArrayType.INT_ARRAY_TYPE;
+ case BIGINT:
+ return ArrayType.LONG_ARRAY_TYPE;
+ case FLOAT:
+ return ArrayType.FLOAT_ARRAY_TYPE;
+ case DOUBLE:
+ return ArrayType.DOUBLE_ARRAY_TYPE;
+ default:
+ String errorMsg = String.format("SeaTunnel array type not supported this genericType [%s] yet", fieldType);
+ throw new UnsupportedOperationException(errorMsg);
+ }
+ default:
+ throw new UnsupportedOperationException("SeaTunnel file connector not support this nest type");
+ }
+ }
+ }
}
@SuppressWarnings("checkstyle:MagicNumber")
@@ -177,10 +351,4 @@
throw new RuntimeException(errorMsg, e);
}
}
-
- private String array2String(ArrayList<GenericData.Record> data) throws JsonProcessingException {
- ObjectMapper objectMapper = new ObjectMapper();
- List<String> values = data.stream().map(record -> record.get(0).toString()).collect(Collectors.toList());
- return objectMapper.writeValueAsString(values);
- }
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
new file mode 100644
index 0000000..c86d54e
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.writer;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.URL;
+import java.nio.file.Paths;
+
+public class ParquetReadStrategyTest {
+ @Test
+ public void testParquetRead() throws Exception {
+ URL resource = ParquetReadStrategyTest.class.getResource("/test.parquet");
+ assert resource != null;
+ String path = Paths.get(resource.toURI()).toString();
+ ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
+ parquetReadStrategy.init(null);
+ SeaTunnelRowType seaTunnelRowTypeInfo = parquetReadStrategy.getSeaTunnelRowTypeInfo(null, path);
+ assert seaTunnelRowTypeInfo != null;
+ TestCollector testCollector = new TestCollector();
+ parquetReadStrategy.read(path, testCollector);
+ }
+
+ public static class TestCollector implements Collector<SeaTunnelRow> {
+
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ @Override
+ public void collect(SeaTunnelRow record) {
+ System.out.println(record);
+ }
+
+ @Override
+ public Object getCheckpointLock() {
+ return null;
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test.parquet b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test.parquet
new file mode 100644
index 0000000..123ef52
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test.parquet
Binary files differ
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
index 6241a81..94bd660 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
@@ -29,6 +29,10 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>connector-file-ftp</artifactId>
+ <properties>
+ <connector.name>file.ftp</connector.name>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml
index 330d7c6..dc59fe9 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/pom.xml
@@ -29,6 +29,10 @@
<artifactId>connector-file-hadoop</artifactId>
+ <properties>
+ <connector.name>file.hadoop</connector.name>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
@@ -40,4 +44,5 @@
<artifactId>flink-shaded-hadoop-2</artifactId>
</dependency>
</dependencies>
+
</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
index 58b9d01..93f000f 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/pom.xml
@@ -29,6 +29,10 @@
<artifactId>connector-file-local</artifactId>
+ <properties>
+ <connector.name>file.local</connector.name>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
@@ -40,4 +44,5 @@
<artifactId>flink-shaded-hadoop-2</artifactId>
</dependency>
</dependencies>
+
</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-oss/pom.xml
index 0a0ae91..4e2ca4d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/pom.xml
@@ -30,6 +30,7 @@
<artifactId>connector-file-oss</artifactId>
<properties>
<hadoop-aliyun.version>2.9.2</hadoop-aliyun.version>
+ <connector.name>file.oss</connector.name>
</properties>
<dependencies>
diff --git a/seatunnel-connectors-v2/connector-file/pom.xml b/seatunnel-connectors-v2/connector-file/pom.xml
index 00a3cc1..f71d739 100644
--- a/seatunnel-connectors-v2/connector-file/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/pom.xml
@@ -36,4 +36,46 @@
<module>connector-file-oss</module>
<module>connector-file-ftp</module>
</modules>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>org.apache.avro</pattern>
+ <!--suppress UnresolvedMavenProperty, this property is added by submodule-->
+ <shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.avro</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>org.apache.parquet</pattern>
+ <!--suppress UnresolvedMavenProperty -->
+ <shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.parquet</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>shaded.parquet</pattern>
+ <!--suppress UnresolvedMavenProperty -->
+ <shadedPattern>${seatunnel.shade.package}.${connector.name}.shaded.parquet</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ <!-- make sure that flatten runs after maven-shade-plugin -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>flatten-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
</project>
\ No newline at end of file