AMBARI-24833. HDFS client kerberos support + small fixes (#27)

* AMBARI-24833. HDFS client kerberos support + small fixes

* AMBARI-24833. Fix principal description
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index f9ef32d..a15ac74 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -132,6 +132,8 @@
   public static final String HDFS_PORT = "logfeeder.hdfs.port";
   public static final String HDFS_FILE_PERMISSIONS = "logfeeder.hdfs.file.permissions";
   public static final String HDFS_KERBEROS = "logfeeder.hdfs.kerberos";
+  public static final String HDFS_KERBEROS_KEYTAB = "logfeeder.hdfs.keytab";
+  public static final String HDFS_KERBEROS_PRINCIPAL = "logfeeder.hdfs.principal";
 
   public static final String S3_ENDPOINT = "logfeeder.s3.endpoint";
   public static final String S3_ENDPOINT_DEFAULT = "https://s3.amazonaws.com";
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index f2eb6c7..b6ab4c7 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -19,7 +19,7 @@
 package org.apache.ambari.logfeeder.conf;
 
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.conf.output.ExternalHdfsOutputConfig;
+import org.apache.ambari.logfeeder.conf.output.HdfsOutputConfig;
 import org.apache.ambari.logfeeder.conf.output.RolloverConfig;
 import org.apache.ambari.logfeeder.conf.output.S3OutputConfig;
 import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
@@ -53,7 +53,7 @@
   private S3OutputConfig s3OutputConfig;
 
   @Inject
-  private ExternalHdfsOutputConfig hdfsOutputConfig;
+  private HdfsOutputConfig hdfsOutputConfig;
 
   private Properties properties;
 
@@ -258,7 +258,7 @@
     defaultValue = "false",
     sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
   )
-  @Value("${" + LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT + ":false}")
+  @Value("${" + LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT + ":true}")
   private boolean useCloudHdfsClient;
 
   @LogSearchPropertyDescription(
@@ -281,15 +281,6 @@
   private String cloudBasePath;
 
   @LogSearchPropertyDescription(
-    name = LogFeederConstants.HDFS_USER,
-    description = "Overrides HADOOP_USER_NAME variable at runtime",
-    examples = {"hdfs"},
-    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
-  )
-  @Value("${"+ LogFeederConstants.HDFS_USER + ":}")
-  private String logfeederHdfsUser;
-
-  @LogSearchPropertyDescription(
     name = LogFeederConstants.CLOUD_STORAGE_USE_FILTERS,
     description = "Use filters for inputs (with filters the output format will be JSON)",
     examples = {"true"},
@@ -460,7 +451,7 @@
     this.cloudStorageMode = cloudStorageMode;
   }
 
-  public ExternalHdfsOutputConfig getHdfsOutputConfig() {
+  public HdfsOutputConfig getHdfsOutputConfig() {
     return hdfsOutputConfig;
   }
 
@@ -480,7 +471,7 @@
     this.rolloverConfig = rolloverConfig;
   }
 
-  public void setHdfsOutputConfig(ExternalHdfsOutputConfig hdfsOutputConfig) {
+  public void setHdfsOutputConfig(HdfsOutputConfig hdfsOutputConfig) {
     this.hdfsOutputConfig = hdfsOutputConfig;
   }
 
@@ -512,14 +503,6 @@
     return useCloudHdfsClient;
   }
 
-  public String getLogfeederHdfsUser() {
-    return logfeederHdfsUser;
-  }
-
-  public void setLogfeederHdfsUser(String logfeederHdfsUser) {
-    this.logfeederHdfsUser = logfeederHdfsUser;
-  }
-
   public void setUseCloudHdfsClient(boolean useCloudHdfsClient) {
     this.useCloudHdfsClient = useCloudHdfsClient;
   }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java
deleted file mode 100644
index fbbf869..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.conf.output;
-
-import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-public class ExternalHdfsOutputConfig {
-
-  @LogSearchPropertyDescription(
-    name = LogFeederConstants.HDFS_HOST,
-    description = "HDFS Name Node host.",
-    examples = {"mynamenodehost"},
-    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
-  )
-  @Value("${"+ LogFeederConstants.HDFS_HOST + ":}")
-  private String hdfsHost;
-
-  @LogSearchPropertyDescription(
-    name = LogFeederConstants.HDFS_PORT,
-    description = "HDFS Name Node port",
-    examples = {"9000"},
-    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
-  )
-  @Value("${"+ LogFeederConstants.HDFS_PORT + ":}")
-  private Integer hdfsPort;
-
-  @LogSearchPropertyDescription(
-    name = LogFeederConstants.HDFS_FILE_PERMISSIONS,
-    description = "Default permissions for created files on HDFS",
-    examples = {"600"},
-    defaultValue = "640",
-    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
-  )
-  @Value("${"+ LogFeederConstants.HDFS_FILE_PERMISSIONS + ":640}")
-  private String hdfsFilePermissions;
-
-  @LogSearchPropertyDescription(
-    name = LogFeederConstants.HDFS_KERBEROS,
-    description = "Enable kerberos support for HDFS",
-    examples = {"true"},
-    defaultValue = "false",
-    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
-  )
-  @Value("${"+ LogFeederConstants.HDFS_KERBEROS + ":false}")
-  private boolean secure;
-
-  public String getHdfsHost() {
-    return hdfsHost;
-  }
-
-  public void setHdfsHost(String hdfsHost) {
-    this.hdfsHost = hdfsHost;
-  }
-
-  public Integer getHdfsPort() {
-    return hdfsPort;
-  }
-
-  public void setHdfsPort(Integer hdfsPort) {
-    this.hdfsPort = hdfsPort;
-  }
-
-  public String getHdfsFilePermissions() {
-    return hdfsFilePermissions;
-  }
-
-  public void setHdfsFilePermissions(String hdfsFilePermissions) {
-    this.hdfsFilePermissions = hdfsFilePermissions;
-  }
-
-  public boolean isSecure() {
-    return secure;
-  }
-
-  public void setSecure(boolean secure) {
-    this.secure = secure;
-  }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java
new file mode 100644
index 0000000..312f2f0
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf.output;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class HdfsOutputConfig {
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.HDFS_HOST,
+    description = "HDFS Name Node host.",
+    examples = {"mynamenodehost"},
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.HDFS_HOST + ":}")
+  private String hdfsHost;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.HDFS_PORT,
+    description = "HDFS Name Node port",
+    examples = {"9000"},
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.HDFS_PORT + ":}")
+  private Integer hdfsPort;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.HDFS_FILE_PERMISSIONS,
+    description = "Default permissions for created files on HDFS",
+    examples = {"600"},
+    defaultValue = "640",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.HDFS_FILE_PERMISSIONS + ":640}")
+  private String hdfsFilePermissions;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.HDFS_USER,
+    description = "Overrides HADOOP_USER_NAME variable at runtime",
+    examples = {"hdfs"},
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.HDFS_USER + ":}")
+  private String logfeederHdfsUser;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.HDFS_KERBEROS,
+    description = "Enable kerberos support for HDFS",
+    examples = {"true"},
+    defaultValue = "false",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.HDFS_KERBEROS + ":false}")
+  private boolean hdfsKerberos;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.HDFS_KERBEROS_KEYTAB,
+    description = "Kerberos keytab location for Log Feeder for communicating with secure HDFS. ",
+    examples = {"/etc/security/keytabs/mykeytab.keytab"},
+    defaultValue = "/etc/security/keytabs/logfeeder.service.keytab",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.HDFS_KERBEROS_KEYTAB + ":/etc/security/keytabs/logfeeder.service.keytab}")
+  private String keytab;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.HDFS_KERBEROS_PRINCIPAL,
+    description = "Kerberos principal for Log Feeder for communicating with secure HDFS. ",
+    examples = {"mylogfeeder/myhost1@EXAMPLE.COM"},
+    defaultValue = "logfeeder/_HOST",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.HDFS_KERBEROS_PRINCIPAL + ":logfeeder/_HOST}")
+  private String principal;
+
+  public String getHdfsHost() {
+    return hdfsHost;
+  }
+
+  public void setHdfsHost(String hdfsHost) {
+    this.hdfsHost = hdfsHost;
+  }
+
+  public Integer getHdfsPort() {
+    return hdfsPort;
+  }
+
+  public void setHdfsPort(Integer hdfsPort) {
+    this.hdfsPort = hdfsPort;
+  }
+
+  public String getHdfsFilePermissions() {
+    return hdfsFilePermissions;
+  }
+
+  public void setHdfsFilePermissions(String hdfsFilePermissions) {
+    this.hdfsFilePermissions = hdfsFilePermissions;
+  }
+
+  public String getKeytab() {
+    return keytab;
+  }
+
+  public void setKeytab(String keytab) {
+    this.keytab = keytab;
+  }
+
+  public String getPrincipal() {
+    return principal;
+  }
+
+  public void setPrincipal(String principal) {
+    this.principal = principal;
+  }
+
+  public String getLogfeederHdfsUser() {
+    return logfeederHdfsUser;
+  }
+
+  public void setLogfeederHdfsUser(String logfeederHdfsUser) {
+    this.logfeederHdfsUser = logfeederHdfsUser;
+  }
+
+  public boolean isHdfsKerberos() {
+    return hdfsKerberos;
+  }
+
+  public void setHdfsKerberos(boolean hdfsKerberos) {
+    this.hdfsKerberos = hdfsKerberos;
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
index 31bfd0d..d383ed1 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
@@ -47,7 +47,7 @@
     for (Input input : inputConfigHolder.getInputManager().getInputList(serviceName)) {
       for (FilterDescriptor filterDescriptor : inputConfigHolder.getFilterConfigList()) {
         if (filterDescriptor == null) {
-          logger.warn("Filter descriptor is smpty. Skipping...");
+          logger.warn("Filter descriptor is empty. Skipping...");
           continue;
         }
         if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) {
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
index ac10b2d..c2e73b7 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
@@ -52,7 +52,7 @@
     final boolean useFilters = inputConfigHolder.getLogFeederProps().isCloudStorageUseFilters();
     for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) {
       if (inputDescriptor == null) {
-        logger.warn("Input descriptor is smpty. Skipping...");
+        logger.warn("Input descriptor is empty. Skipping...");
         continue;
       }
       LogFeederMode mode = inputConfigHolder.getLogFeederProps().getCloudStorageMode();
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
index dd0fe3e..4677461 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
@@ -74,7 +74,7 @@
   private void loadInputs(String serviceName, InputConfigHolder inputConfigHolder) {
     for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) {
       if (inputDescriptor == null) {
-        logger.warn("Input descriptor is smpty. Skipping...");
+        logger.warn("Input descriptor is empty. Skipping...");
         continue;
       }
 
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
index bd9e3df..ff0805d 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
@@ -46,7 +46,7 @@
     Input input = inputMarker.getInput();
     // Update the block with the context fields
     for (Map.Entry<String, String> entry : input.getInputDescriptor().getAddFields().entrySet()) {
-      if (jsonObj.get(entry.getKey()) == null || entry.getKey().equals("cluster") && "null".equals(jsonObj.get(entry.getKey()))) {
+      if (jsonObj.get(entry.getKey()) == null || "cluster".equals(entry.getKey()) && "null".equals(jsonObj.get(entry.getKey()))) {
         jsonObj.put(entry.getKey(), entry.getValue());
       }
     }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java
deleted file mode 100644
index a23a715..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.output.cloud.upload;
-
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.conf.output.ExternalHdfsOutputConfig;
-import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-/**
- * HDFS (on-prem) specific uploader client that can work with an external HDFS.
- */
-public class ExternalHDFSUploadClient implements UploadClient {
-
-  private static final Logger logger = LogManager.getLogger(ExternalHDFSUploadClient.class);
-
-  private final ExternalHdfsOutputConfig hdfsOutputConfig;
-  private final FsPermission fsPermission;
-  private FileSystem fs;
-
-  public ExternalHDFSUploadClient(ExternalHdfsOutputConfig hdfsOutputConfig) {
-    this.hdfsOutputConfig = hdfsOutputConfig;
-    this.fsPermission = new FsPermission(hdfsOutputConfig.getHdfsFilePermissions());
-  }
-
-  @Override
-  public void init(LogFeederProps logFeederProps) {
-    logger.info("Initialize external HDFS client ...");
-    if (StringUtils.isNotBlank(logFeederProps.getLogfeederHdfsUser())) {
-      logger.info("Using HADOOP_USER_NAME: {}", logFeederProps.getLogfeederHdfsUser());
-      System.setProperty("HADOOP_USER_NAME", logFeederProps.getLogfeederHdfsUser());
-    }
-    this.fs = LogFeederHDFSUtil.buildFileSystem(
-      hdfsOutputConfig.getHdfsHost(),
-      String.valueOf(hdfsOutputConfig.getHdfsPort()));
-    if (logFeederProps.getHdfsOutputConfig().isSecure()) {
-      logger.info("Kerberos is enabled for external HDFS.");
-      Configuration conf = fs.getConf();
-      conf.set("hadoop.security.authentication", "kerberos");
-    }
-  }
-
-  @Override
-  public void upload(String source, String target) throws Exception {
-    LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, fsPermission);
-  }
-
-  @Override
-  public void close() {
-    LogFeederHDFSUtil.closeFileSystem(fs);
-  }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
index c2a8497..421c4c5 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
@@ -19,15 +19,17 @@
 package org.apache.ambari.logfeeder.output.cloud.upload;
 
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.conf.output.HdfsOutputConfig;
 import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.io.IOException;
-
 /**
  * HDFS client that uses core-site.xml file from the classpath to load the configuration.
  * Can connect to S3 / GCS / WASB / ADLS if the core-site.xml is configured to use one of those cloud storages
@@ -35,37 +37,64 @@
 public class HDFSUploadClient implements UploadClient {
 
   private static final String FS_DEFAULT_FS = "fs.defaultFS";
+  private static final String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
 
   private static final Logger logger = LogManager.getLogger(HDFSUploadClient.class);
 
+  private final boolean externalHdfs;
+  private final HdfsOutputConfig hdfsOutputConfig;
+  private final FsPermission fsPermission;
   private FileSystem fs;
 
+  public HDFSUploadClient(HdfsOutputConfig hdfsOutputConfig, boolean externalHdfs) {
+    this.hdfsOutputConfig = hdfsOutputConfig;
+    this.externalHdfs = externalHdfs;
+    this.fsPermission = new FsPermission(hdfsOutputConfig.getHdfsFilePermissions());
+  }
+
   @Override
   public void init(LogFeederProps logFeederProps) {
-    logger.info("Initialize HDFS client (cloud mode), using core-site.xml from the classpath.");
-    Configuration configuration = new Configuration();
+    final Configuration configuration;
+    if (externalHdfs) {
+      configuration = LogFeederHDFSUtil.buildHdfsConfiguration(hdfsOutputConfig.getHdfsHost(), String.valueOf(hdfsOutputConfig.getHdfsPort()), "hdfs");
+      logger.info("Using external HDFS client as core-site.xml is not located on the classpath.");
+    } else {
+      configuration = new Configuration();
+      logger.info("Initialize HDFS client (cloud mode), using core-site.xml from the classpath.");
+    }
     if (StringUtils.isNotBlank(logFeederProps.getCustomFs())) {
       configuration.set(FS_DEFAULT_FS, logFeederProps.getCustomFs());
     }
-    if (StringUtils.isNotBlank(logFeederProps.getLogfeederHdfsUser()) && isHadoopFileSystem(configuration)) {
-      logger.info("Using HADOOP_USER_NAME: {}", logFeederProps.getLogfeederHdfsUser());
-      System.setProperty("HADOOP_USER_NAME", logFeederProps.getLogfeederHdfsUser());
+    if (hdfsOutputConfig.isHdfsKerberos()) {
+      logger.info("Kerberos is enabled for HDFS.");
+      configuration.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+      final String principal = hdfsOutputConfig.getPrincipal().replace("_HOST", LogFeederUtil.hostName);
+      UserGroupInformation.setConfiguration(configuration);
+      try {
+        UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, hdfsOutputConfig.getKeytab());
+        UserGroupInformation.setLoginUser(ugi);
+      } catch (Exception e) {
+        logger.error("Error during kerberos login", e);
+        throw new RuntimeException(e);
+      }
+    } else {
+      if (StringUtils.isNotBlank(hdfsOutputConfig.getLogfeederHdfsUser())) {
+        logger.info("Using HADOOP_USER_NAME: {}", hdfsOutputConfig.getLogfeederHdfsUser());
+        System.setProperty("HADOOP_USER_NAME", hdfsOutputConfig.getLogfeederHdfsUser());
+      }
     }
+    logger.info("HDFS client - will use '{}' permission for uploaded files", hdfsOutputConfig.getHdfsFilePermissions());
     this.fs = LogFeederHDFSUtil.buildFileSystem(configuration);
   }
 
   @Override
   public void upload(String source, String target) throws Exception {
-    LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, null);
+    LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, this.fsPermission);
   }
 
   @Override
-  public void close() throws IOException {
+  public void close() {
     LogFeederHDFSUtil.closeFileSystem(fs);
   }
 
-  private boolean isHadoopFileSystem(Configuration conf) {
-    return conf.get(FS_DEFAULT_FS).contains("hdfs://");
-  }
-
 }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
index bea2943..27d69c7 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
@@ -43,11 +43,11 @@
     if (useHdfsClient && checkCoreSiteIsOnClasspath(logFeederProps)) {
       logger.info("The core-site.xml from the classpath will be used to figure it out the cloud output settings.");
       logFeederProps.setCloudStorageDestination(CloudStorageDestination.DEFAULT_FS);
-      return new HDFSUploadClient();
+      return new HDFSUploadClient(logFeederProps.getHdfsOutputConfig(), false);
     }
     else if (CloudStorageDestination.HDFS.equals(destType)) {
       logger.info("External HDFS output will be used.");
-      return new ExternalHDFSUploadClient(logFeederProps.getHdfsOutputConfig());
+      return new HDFSUploadClient(logFeederProps.getHdfsOutputConfig(), true);
     } else if (CloudStorageDestination.S3.equals(destType)) {
       if (useHdfsClient) {
         logger.info("S3 cloud output will be used with HDFS client.");