[Bug] Fix OrcWriteStrategy/ParquetWriteStrategy doesn't login with kerberos (#6472)

diff --git a/release-note.md b/release-note.md
index 6c32586..9b41cc9 100644
--- a/release-note.md
+++ b/release-note.md
@@ -52,6 +52,7 @@
 - [Connector-v2] [Clickhouse] fix get clickhouse local table name with closing bracket from distributed table engineFull (#4710)
 - [Connector-v2] [CDC] Fix jdbc connection leak for mysql (#5037)
 - [Connector-v2] [File] Fix WriteStrategy parallel writing thread unsafe issue #5546
+- [Connector-v2] [File] Inject FileSystem to OrcWriteStrategy
 - [Connector-v2] [File] Support assign encoding for file source/sink (#5973)
 
 ### Zeta(ST-Engine)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
index b60ab47..11bbe4d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.file.config;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.Path;
 
 import lombok.Data;
@@ -26,6 +27,11 @@
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
+import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
+import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
+import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
+
 @Data
 public class HadoopConf implements Serializable {
     private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";
@@ -60,4 +66,16 @@
             configuration.addResource(new Path(hdfsSitePath));
         }
     }
+
+    public Configuration toConfiguration() {
+        Configuration configuration = new Configuration();
+        configuration.setBoolean(READ_INT96_AS_FIXED, true);
+        configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
+        configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
+        configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, true);
+        configuration.setBoolean(String.format("fs.%s.impl.disable.cache", getSchema()), true);
+        configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, getHdfsNameKey());
+        configuration.set(String.format("fs.%s.impl", getSchema()), getFsHdfsImpl());
+        return configuration;
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
index 61f4520..79df7ef 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
@@ -23,7 +23,6 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -43,11 +42,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
-import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
-import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
-import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
-
 @Slf4j
 public class HadoopFileSystemProxy implements Serializable, Closeable {
 
@@ -64,30 +58,19 @@
     }
 
     public boolean fileExist(@NonNull String filePath) throws IOException {
-        if (fileSystem == null) {
-            initialize();
-        }
-        Path fileName = new Path(filePath);
-        return fileSystem.exists(fileName);
+        return getFileSystem().exists(new Path(filePath));
     }
 
     public void createFile(@NonNull String filePath) throws IOException {
-        if (fileSystem == null) {
-            initialize();
-        }
-        Path path = new Path(filePath);
-        if (!fileSystem.createNewFile(path)) {
+        if (!getFileSystem().createNewFile(new Path(filePath))) {
             throw CommonError.fileOperationFailed("SeaTunnel", "create", filePath);
         }
     }
 
     public void deleteFile(@NonNull String filePath) throws IOException {
-        if (fileSystem == null) {
-            initialize();
-        }
         Path path = new Path(filePath);
-        if (fileSystem.exists(path)) {
-            if (!fileSystem.delete(path, true)) {
+        if (getFileSystem().exists(path)) {
+            if (!getFileSystem().delete(path, true)) {
                 throw CommonError.fileOperationFailed("SeaTunnel", "delete", filePath);
             }
         }
@@ -98,9 +81,6 @@
             @NonNull String newFilePath,
             boolean removeWhenNewFilePathExist)
             throws IOException {
-        if (fileSystem == null) {
-            initialize();
-        }
         Path oldPath = new Path(oldFilePath);
         Path newPath = new Path(newFilePath);
 
@@ -116,7 +96,7 @@
 
         if (removeWhenNewFilePathExist) {
             if (fileExist(newFilePath)) {
-                fileSystem.delete(newPath, true);
+                getFileSystem().delete(newPath, true);
                 log.info("Delete already file: {}", newPath);
             }
         }
@@ -124,7 +104,7 @@
             createDir(newPath.getParent().toString());
         }
 
-        if (fileSystem.rename(oldPath, newPath)) {
+        if (getFileSystem().rename(oldPath, newPath)) {
             log.info("rename file :[" + oldPath + "] to [" + newPath + "] finish");
         } else {
             throw CommonError.fileOperationFailed(
@@ -133,26 +113,20 @@
     }
 
     public void createDir(@NonNull String filePath) throws IOException {
-        if (fileSystem == null) {
-            initialize();
-        }
         Path dfs = new Path(filePath);
-        if (!fileSystem.mkdirs(dfs)) {
+        if (!getFileSystem().mkdirs(dfs)) {
             throw CommonError.fileOperationFailed("SeaTunnel", "create", filePath);
         }
     }
 
     public List<LocatedFileStatus> listFile(String path) throws IOException {
-        if (fileSystem == null) {
-            initialize();
-        }
         List<LocatedFileStatus> fileList = new ArrayList<>();
         if (!fileExist(path)) {
             return fileList;
         }
         Path fileName = new Path(path);
         RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator =
-                fileSystem.listFiles(fileName, false);
+                getFileSystem().listFiles(fileName, false);
         while (locatedFileStatusRemoteIterator.hasNext()) {
             fileList.add(locatedFileStatusRemoteIterator.next());
         }
@@ -160,15 +134,12 @@
     }
 
     public List<Path> getAllSubFiles(@NonNull String filePath) throws IOException {
-        if (fileSystem == null) {
-            initialize();
-        }
         List<Path> pathList = new ArrayList<>();
         if (!fileExist(filePath)) {
             return pathList;
         }
         Path fileName = new Path(filePath);
-        FileStatus[] status = fileSystem.listStatus(fileName);
+        FileStatus[] status = getFileSystem().listStatus(fileName);
         if (status != null) {
             for (FileStatus fileStatus : status) {
                 if (fileStatus.isDirectory()) {
@@ -180,31 +151,26 @@
     }
 
     public FileStatus[] listStatus(String filePath) throws IOException {
-        if (fileSystem == null) {
-            initialize();
-        }
-        return fileSystem.listStatus(new Path(filePath));
+        return getFileSystem().listStatus(new Path(filePath));
     }
 
     public FileStatus getFileStatus(String filePath) throws IOException {
-        if (fileSystem == null) {
-            initialize();
-        }
-        return fileSystem.getFileStatus(new Path(filePath));
+        return getFileSystem().getFileStatus(new Path(filePath));
     }
 
     public FSDataOutputStream getOutputStream(String filePath) throws IOException {
-        if (fileSystem == null) {
-            initialize();
-        }
-        return fileSystem.create(new Path(filePath), true);
+        return getFileSystem().create(new Path(filePath), true);
     }
 
     public FSDataInputStream getInputStream(String filePath) throws IOException {
+        return getFileSystem().open(new Path(filePath));
+    }
+
+    public FileSystem getFileSystem() {
         if (fileSystem == null) {
             initialize();
         }
-        return fileSystem.open(new Path(filePath));
+        return fileSystem;
     }
 
     @SneakyThrows
@@ -258,16 +224,7 @@
     }
 
     private Configuration createConfiguration() {
-        Configuration configuration = new Configuration();
-        configuration.setBoolean(READ_INT96_AS_FIXED, true);
-        configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
-        configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
-        configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, true);
-        configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
-        configuration.setBoolean(
-                String.format("fs.%s.impl.disable.cache", hadoopConf.getSchema()), true);
-        configuration.set(
-                String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl());
+        Configuration configuration = hadoopConf.toConfiguration();
         hadoopConf.setExtraOptionsForConfiguration(configuration);
         return configuration;
     }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index ab88b22..6847648 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -36,7 +36,6 @@
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -58,11 +57,6 @@
 import java.util.regex.Matcher;
 import java.util.stream.Collectors;
 
-import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
-import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
-import static org.apache.parquet.avro.AvroWriteSupport.WRITE_FIXED_AS_INT96;
-import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
-
 public abstract class AbstractWriteStrategy implements WriteStrategy {
     protected final Logger log = LoggerFactory.getLogger(this.getClass());
     protected final FileSinkConfig fileSinkConfig;
@@ -148,14 +142,7 @@
      */
     @Override
     public Configuration getConfiguration(HadoopConf hadoopConf) {
-        Configuration configuration = new Configuration();
-        configuration.setBoolean(READ_INT96_AS_FIXED, true);
-        configuration.setBoolean(WRITE_FIXED_AS_INT96, true);
-        configuration.setBoolean(ADD_LIST_ELEMENT_RECORDS, false);
-        configuration.setBoolean(WRITE_OLD_LIST_STRUCTURE, false);
-        configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
-        configuration.set(
-                String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl());
+        Configuration configuration = hadoopConf.toConfiguration();
         this.hadoopConf.setExtraOptionsForConfiguration(configuration);
         return configuration;
     }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
index 3a800b9..79ca193 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -122,6 +122,7 @@
                                 .compress(compressFormat.getOrcCompression())
                                 // use orc version 0.12
                                 .version(OrcFile.Version.V_0_12)
+                                .fileSystem(hadoopFileSystemProxy.getFileSystem())
                                 .overwrite(true);
                 Writer newWriter = OrcFile.createWriter(path, options);
                 this.beingWrittenWriter.put(filePath, newWriter);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
index 84ad0a7..95343ae 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
@@ -139,25 +139,33 @@
         dataModel.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
         if (writer == null) {
             Path path = new Path(filePath);
-            try {
-                HadoopOutputFile outputFile =
-                        HadoopOutputFile.fromPath(path, getConfiguration(hadoopConf));
-                ParquetWriter<GenericRecord> newWriter =
-                        AvroParquetWriter.<GenericRecord>builder(outputFile)
-                                .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
-                                .withDataModel(dataModel)
-                                // use parquet v1 to improve compatibility
-                                .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
-                                .withCompressionCodec(compressFormat.getParquetCompression())
-                                .withSchema(schema)
-                                .build();
-                this.beingWrittenWriter.put(filePath, newWriter);
-                return newWriter;
-            } catch (IOException e) {
-                String errorMsg = String.format("Get parquet writer for file [%s] error", filePath);
-                throw new FileConnectorException(
-                        CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, errorMsg, e);
-            }
+            // initialize the kerberos login
+            return hadoopFileSystemProxy.doWithHadoopAuth(
+                    (configuration, userGroupInformation) -> {
+                        try {
+                            HadoopOutputFile outputFile =
+                                    HadoopOutputFile.fromPath(path, getConfiguration(hadoopConf));
+                            ParquetWriter<GenericRecord> newWriter =
+                                    AvroParquetWriter.<GenericRecord>builder(outputFile)
+                                            .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+                                            .withDataModel(dataModel)
+                                            // use parquet v1 to improve compatibility
+                                            .withWriterVersion(
+                                                    ParquetProperties.WriterVersion.PARQUET_1_0)
+                                            .withCompressionCodec(
+                                                    compressFormat.getParquetCompression())
+                                            .withSchema(schema)
+                                            .build();
+                            this.beingWrittenWriter.put(filePath, newWriter);
+                            return newWriter;
+                        } catch (IOException e) {
+                            String errorMsg =
+                                    String.format(
+                                            "Get parquet writer for file [%s] error", filePath);
+                            throw new FileConnectorException(
+                                    CommonErrorCodeDeprecated.WRITER_OPERATION_FAILED, errorMsg, e);
+                        }
+                    });
         }
         return writer;
     }