[Bug] Fix OrcWriteStrategy/ParquetWriteStrategy doesn't login with kerberos (#6472)
diff --git a/release-note.md b/release-note.md
index 6c32586..9b41cc9 100644
--- a/release-note.md
+++ b/release-note.md
@@ -52,6 +52,7 @@
- [Connector-v2] [Clickhouse] fix get clickhouse local table name with closing bracket from distributed table engineFull (#4710)
- [Connector-v2] [CDC] Fix jdbc connection leak for mysql (#5037)
- [Connector-v2] [File] Fix WriteStrategy parallel writing thread unsafe issue #5546
+- [Connector-v2] [File] Inject FileSystem to OrcWriteStrategy
- [Connector-v2] [File] Support assign encoding for file source/sink (#5973)
### Zeta(ST-Engine)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
index b60ab47..11bbe4d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.config;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.Path;
import lombok.Data;
@@ -26,6 +27,11 @@
import java.util.HashMap;
import java.util.Map;
+import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
+import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
+import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
+import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
+
@Data
public class HadoopConf implements Serializable {
private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";
@@ -60,4 +66,16 @@
configuration.addResource(new Path(hdfsSitePath));
}
}
+
+ public Configuration toConfiguration() {
+ Configuration configuration = new Configuration();
+ configuration.setBoolean(READ_INT96_AS_FIXED, true);
+ configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
+ configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
+ configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, true);
+ configuration.setBoolean(String.format("fs.%s.impl.disable.cache", getSchema()), true);
+ configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, getHdfsNameKey());
+ configuration.set(String.format("fs.%s.impl", getSchema()), getFsHdfsImpl());
+ return configuration;
+ }
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
index 61f4520..79df7ef 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
@@ -23,7 +23,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -43,11 +42,6 @@
import java.util.ArrayList;
import java.util.List;
-import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
-import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
-import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
-import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
-
@Slf4j
public class HadoopFileSystemProxy implements Serializable, Closeable {
@@ -64,30 +58,19 @@
}
public boolean fileExist(@NonNull String filePath) throws IOException {
- if (fileSystem == null) {
- initialize();
- }
- Path fileName = new Path(filePath);
- return fileSystem.exists(fileName);
+ return getFileSystem().exists(new Path(filePath));
}
public void createFile(@NonNull String filePath) throws IOException {
- if (fileSystem == null) {
- initialize();
- }
- Path path = new Path(filePath);
- if (!fileSystem.createNewFile(path)) {
+ if (!getFileSystem().createNewFile(new Path(filePath))) {
throw CommonError.fileOperationFailed("SeaTunnel", "create", filePath);
}
}
public void deleteFile(@NonNull String filePath) throws IOException {
- if (fileSystem == null) {
- initialize();
- }
Path path = new Path(filePath);
- if (fileSystem.exists(path)) {
- if (!fileSystem.delete(path, true)) {
+ if (getFileSystem().exists(path)) {
+ if (!getFileSystem().delete(path, true)) {
throw CommonError.fileOperationFailed("SeaTunnel", "delete", filePath);
}
}
@@ -98,9 +81,6 @@
@NonNull String newFilePath,
boolean removeWhenNewFilePathExist)
throws IOException {
- if (fileSystem == null) {
- initialize();
- }
Path oldPath = new Path(oldFilePath);
Path newPath = new Path(newFilePath);
@@ -116,7 +96,7 @@
if (removeWhenNewFilePathExist) {
if (fileExist(newFilePath)) {
- fileSystem.delete(newPath, true);
+ getFileSystem().delete(newPath, true);
log.info("Delete already file: {}", newPath);
}
}
@@ -124,7 +104,7 @@
createDir(newPath.getParent().toString());
}
- if (fileSystem.rename(oldPath, newPath)) {
+ if (getFileSystem().rename(oldPath, newPath)) {
log.info("rename file :[" + oldPath + "] to [" + newPath + "] finish");
} else {
throw CommonError.fileOperationFailed(
@@ -133,26 +113,20 @@
}
public void createDir(@NonNull String filePath) throws IOException {
- if (fileSystem == null) {
- initialize();
- }
Path dfs = new Path(filePath);
- if (!fileSystem.mkdirs(dfs)) {
+ if (!getFileSystem().mkdirs(dfs)) {
throw CommonError.fileOperationFailed("SeaTunnel", "create", filePath);
}
}
public List<LocatedFileStatus> listFile(String path) throws IOException {
- if (fileSystem == null) {
- initialize();
- }
List<LocatedFileStatus> fileList = new ArrayList<>();
if (!fileExist(path)) {
return fileList;
}
Path fileName = new Path(path);
RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator =
- fileSystem.listFiles(fileName, false);
+ getFileSystem().listFiles(fileName, false);
while (locatedFileStatusRemoteIterator.hasNext()) {
fileList.add(locatedFileStatusRemoteIterator.next());
}
@@ -160,15 +134,12 @@
}
public List<Path> getAllSubFiles(@NonNull String filePath) throws IOException {
- if (fileSystem == null) {
- initialize();
- }
List<Path> pathList = new ArrayList<>();
if (!fileExist(filePath)) {
return pathList;
}
Path fileName = new Path(filePath);
- FileStatus[] status = fileSystem.listStatus(fileName);
+ FileStatus[] status = getFileSystem().listStatus(fileName);
if (status != null) {
for (FileStatus fileStatus : status) {
if (fileStatus.isDirectory()) {
@@ -180,31 +151,26 @@
}
public FileStatus[] listStatus(String filePath) throws IOException {
- if (fileSystem == null) {
- initialize();
- }
- return fileSystem.listStatus(new Path(filePath));
+ return getFileSystem().listStatus(new Path(filePath));
}
public FileStatus getFileStatus(String filePath) throws IOException {
- if (fileSystem == null) {
- initialize();
- }
- return fileSystem.getFileStatus(new Path(filePath));
+ return getFileSystem().getFileStatus(new Path(filePath));
}
public FSDataOutputStream getOutputStream(String filePath) throws IOException {
- if (fileSystem == null) {
- initialize();
- }
- return fileSystem.create(new Path(filePath), true);
+ return getFileSystem().create(new Path(filePath), true);
}
public FSDataInputStream getInputStream(String filePath) throws IOException {
+ return getFileSystem().open(new Path(filePath));
+ }
+
+ public FileSystem getFileSystem() {
if (fileSystem == null) {
initialize();
}
- return fileSystem.open(new Path(filePath));
+ return fileSystem;
}
@SneakyThrows
@@ -258,16 +224,7 @@
}
private Configuration createConfiguration() {
- Configuration configuration = new Configuration();
- configuration.setBoolean(READ_INT96_AS_FIXED, true);
- configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
- configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
- configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, true);
- configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
- configuration.setBoolean(
- String.format("fs.%s.impl.disable.cache", hadoopConf.getSchema()), true);
- configuration.set(
- String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl());
+ Configuration configuration = hadoopConf.toConfiguration();
hadoopConf.setExtraOptionsForConfiguration(configuration);
return configuration;
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index ab88b22..6847648 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -36,7 +36,6 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,11 +57,6 @@
import java.util.regex.Matcher;
import java.util.stream.Collectors;
-import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
-import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
-import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
-import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
-
public abstract class AbstractWriteStrategy implements WriteStrategy {
protected final Logger log = LoggerFactory.getLogger(this.getClass());
protected final FileSinkConfig fileSinkConfig;
@@ -148,14 +142,7 @@
*/
@Override
public Configuration getConfiguration(HadoopConf hadoopConf) {
- Configuration configuration = new Configuration();
- configuration.setBoolean(READ_INT96_AS_FIXED, true);
- configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
- configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
- configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, false);
- configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
- configuration.set(
- String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl());
+ Configuration configuration = hadoopConf.toConfiguration();
this.hadoopConf.setExtraOptionsForConfiguration(configuration);
return configuration;
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
index 3a800b9..79ca193 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -122,6 +122,7 @@
.compress(compressFormat.getOrcCompression())
// use orc version 0.12
.version(OrcFile.Version.V_0_12)
+ .fileSystem(hadoopFileSystemProxy.getFileSystem())
.overwrite(true);
Writer newWriter = OrcFile.createWriter(path, options);
this.beingWrittenWriter.put(filePath, newWriter);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
index 84ad0a7..95343ae 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
@@ -139,25 +139,33 @@
dataModel.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
if (writer == null) {
Path path = new Path(filePath);
- try {
- HadoopOutputFile outputFile =
- HadoopOutputFile.fromPath(path, getConfiguration(hadoopConf));
- ParquetWriter<GenericRecord> newWriter =
- AvroParquetWriter.<GenericRecord>builder(outputFile)
- .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
- .withDataModel(dataModel)
- // use parquet v1 to improve compatibility
- .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
- .withCompressionCodec(compressFormat.getParquetCompression())
- .withSchema(schema)
- .build();
- this.beingWrittenWriter.put(filePath, newWriter);
- return newWriter;
- } catch (IOException e) {
- String errorMsg = String.format("Get parquet writer for file [%s] error", filePath);
- throw new FileConnectorException(
- CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, errorMsg, e);
- }
+ // initialize the kerberos login
+ return hadoopFileSystemProxy.doWithHadoopAuth(
+ (configuration, userGroupInformation) -> {
+ try {
+ HadoopOutputFile outputFile =
+ HadoopOutputFile.fromPath(path, getConfiguration(hadoopConf));
+ ParquetWriter<GenericRecord> newWriter =
+ AvroParquetWriter.<GenericRecord>builder(outputFile)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .withDataModel(dataModel)
+ // use parquet v1 to improve compatibility
+ .withWriterVersion(
+ ParquetProperties.WriterVersion.PARQUET_1_0)
+ .withCompressionCodec(
+ compressFormat.getParquetCompression())
+ .withSchema(schema)
+ .build();
+ this.beingWrittenWriter.put(filePath, newWriter);
+ return newWriter;
+ } catch (IOException e) {
+ String errorMsg =
+ String.format(
+ "Get parquet writer for file [%s] error", filePath);
+ throw new FileConnectorException(
+ CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, errorMsg, e);
+ }
+ });
}
return writer;
}