AMBARI-24833. HDFS client kerberos support + small fixes (#27)
* AMBARI-24833. HDFS client kerberos support + small fixes
* AMBARI-24833. Fix principal description
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index f9ef32d..a15ac74 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -132,6 +132,8 @@
public static final String HDFS_PORT = "logfeeder.hdfs.port";
public static final String HDFS_FILE_PERMISSIONS = "logfeeder.hdfs.file.permissions";
public static final String HDFS_KERBEROS = "logfeeder.hdfs.kerberos";
+ public static final String HDFS_KERBEROS_KEYTAB = "logfeeder.hdfs.keytab";
+ public static final String HDFS_KERBEROS_PRINCIPAL = "logfeeder.hdfs.principal";
public static final String S3_ENDPOINT = "logfeeder.s3.endpoint";
public static final String S3_ENDPOINT_DEFAULT = "https://s3.amazonaws.com";
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index f2eb6c7..b6ab4c7 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -19,7 +19,7 @@
package org.apache.ambari.logfeeder.conf;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.conf.output.ExternalHdfsOutputConfig;
+import org.apache.ambari.logfeeder.conf.output.HdfsOutputConfig;
import org.apache.ambari.logfeeder.conf.output.RolloverConfig;
import org.apache.ambari.logfeeder.conf.output.S3OutputConfig;
import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
@@ -53,7 +53,7 @@
private S3OutputConfig s3OutputConfig;
@Inject
- private ExternalHdfsOutputConfig hdfsOutputConfig;
+ private HdfsOutputConfig hdfsOutputConfig;
private Properties properties;
@@ -258,7 +258,7 @@
defaultValue = "false",
sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
)
- @Value("${" + LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT + ":false}")
+ @Value("${" + LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT + ":true}")
private boolean useCloudHdfsClient;
@LogSearchPropertyDescription(
@@ -281,15 +281,6 @@
private String cloudBasePath;
@LogSearchPropertyDescription(
- name = LogFeederConstants.HDFS_USER,
- description = "Overrides HADOOP_USER_NAME variable at runtime",
- examples = {"hdfs"},
- sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
- )
- @Value("${"+ LogFeederConstants.HDFS_USER + ":}")
- private String logfeederHdfsUser;
-
- @LogSearchPropertyDescription(
name = LogFeederConstants.CLOUD_STORAGE_USE_FILTERS,
description = "Use filters for inputs (with filters the output format will be JSON)",
examples = {"true"},
@@ -460,7 +451,7 @@
this.cloudStorageMode = cloudStorageMode;
}
- public ExternalHdfsOutputConfig getHdfsOutputConfig() {
+ public HdfsOutputConfig getHdfsOutputConfig() {
return hdfsOutputConfig;
}
@@ -480,7 +471,7 @@
this.rolloverConfig = rolloverConfig;
}
- public void setHdfsOutputConfig(ExternalHdfsOutputConfig hdfsOutputConfig) {
+ public void setHdfsOutputConfig(HdfsOutputConfig hdfsOutputConfig) {
this.hdfsOutputConfig = hdfsOutputConfig;
}
@@ -512,14 +503,6 @@
return useCloudHdfsClient;
}
- public String getLogfeederHdfsUser() {
- return logfeederHdfsUser;
- }
-
- public void setLogfeederHdfsUser(String logfeederHdfsUser) {
- this.logfeederHdfsUser = logfeederHdfsUser;
- }
-
public void setUseCloudHdfsClient(boolean useCloudHdfsClient) {
this.useCloudHdfsClient = useCloudHdfsClient;
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java
deleted file mode 100644
index fbbf869..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.conf.output;
-
-import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-public class ExternalHdfsOutputConfig {
-
- @LogSearchPropertyDescription(
- name = LogFeederConstants.HDFS_HOST,
- description = "HDFS Name Node host.",
- examples = {"mynamenodehost"},
- sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
- )
- @Value("${"+ LogFeederConstants.HDFS_HOST + ":}")
- private String hdfsHost;
-
- @LogSearchPropertyDescription(
- name = LogFeederConstants.HDFS_PORT,
- description = "HDFS Name Node port",
- examples = {"9000"},
- sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
- )
- @Value("${"+ LogFeederConstants.HDFS_PORT + ":}")
- private Integer hdfsPort;
-
- @LogSearchPropertyDescription(
- name = LogFeederConstants.HDFS_FILE_PERMISSIONS,
- description = "Default permissions for created files on HDFS",
- examples = {"600"},
- defaultValue = "640",
- sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
- )
- @Value("${"+ LogFeederConstants.HDFS_FILE_PERMISSIONS + ":640}")
- private String hdfsFilePermissions;
-
- @LogSearchPropertyDescription(
- name = LogFeederConstants.HDFS_KERBEROS,
- description = "Enable kerberos support for HDFS",
- examples = {"true"},
- defaultValue = "false",
- sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
- )
- @Value("${"+ LogFeederConstants.HDFS_KERBEROS + ":false}")
- private boolean secure;
-
- public String getHdfsHost() {
- return hdfsHost;
- }
-
- public void setHdfsHost(String hdfsHost) {
- this.hdfsHost = hdfsHost;
- }
-
- public Integer getHdfsPort() {
- return hdfsPort;
- }
-
- public void setHdfsPort(Integer hdfsPort) {
- this.hdfsPort = hdfsPort;
- }
-
- public String getHdfsFilePermissions() {
- return hdfsFilePermissions;
- }
-
- public void setHdfsFilePermissions(String hdfsFilePermissions) {
- this.hdfsFilePermissions = hdfsFilePermissions;
- }
-
- public boolean isSecure() {
- return secure;
- }
-
- public void setSecure(boolean secure) {
- this.secure = secure;
- }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java
new file mode 100644
index 0000000..312f2f0
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf.output;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class HdfsOutputConfig {
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.HDFS_HOST,
+ description = "HDFS Name Node host.",
+ examples = {"mynamenodehost"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.HDFS_HOST + ":}")
+ private String hdfsHost;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.HDFS_PORT,
+ description = "HDFS Name Node port",
+ examples = {"9000"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.HDFS_PORT + ":}")
+ private Integer hdfsPort;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.HDFS_FILE_PERMISSIONS,
+ description = "Default permissions for created files on HDFS",
+ examples = {"600"},
+ defaultValue = "640",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.HDFS_FILE_PERMISSIONS + ":640}")
+ private String hdfsFilePermissions;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.HDFS_USER,
+ description = "Overrides HADOOP_USER_NAME variable at runtime",
+ examples = {"hdfs"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.HDFS_USER + ":}")
+ private String logfeederHdfsUser;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.HDFS_KERBEROS,
+ description = "Enable kerberos support for HDFS",
+ examples = {"true"},
+ defaultValue = "false",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.HDFS_KERBEROS + ":false}")
+ private boolean hdfsKerberos;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.HDFS_KERBEROS_KEYTAB,
+ description = "Kerberos keytab location for Log Feeder for communicating with secure HDFS. ",
+ examples = {"/etc/security/keytabs/mykeytab.keytab"},
+ defaultValue = "/etc/security/keytabs/logfeeder.service.keytab",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.HDFS_KERBEROS_KEYTAB + ":/etc/security/keytabs/logfeeder.service.keytab}")
+ private String keytab;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.HDFS_KERBEROS_PRINCIPAL,
+ description = "Kerberos principal for Log Feeder for communicating with secure HDFS. ",
+ examples = {"mylogfeeder/myhost1@EXAMPLE.COM"},
+ defaultValue = "logfeeder/_HOST",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.HDFS_KERBEROS_PRINCIPAL + ":logfeeder/_HOST}")
+ private String principal;
+
+ public String getHdfsHost() {
+ return hdfsHost;
+ }
+
+ public void setHdfsHost(String hdfsHost) {
+ this.hdfsHost = hdfsHost;
+ }
+
+ public Integer getHdfsPort() {
+ return hdfsPort;
+ }
+
+ public void setHdfsPort(Integer hdfsPort) {
+ this.hdfsPort = hdfsPort;
+ }
+
+ public String getHdfsFilePermissions() {
+ return hdfsFilePermissions;
+ }
+
+ public void setHdfsFilePermissions(String hdfsFilePermissions) {
+ this.hdfsFilePermissions = hdfsFilePermissions;
+ }
+
+ public String getKeytab() {
+ return keytab;
+ }
+
+ public void setKeytab(String keytab) {
+ this.keytab = keytab;
+ }
+
+ public String getPrincipal() {
+ return principal;
+ }
+
+ public void setPrincipal(String principal) {
+ this.principal = principal;
+ }
+
+ public String getLogfeederHdfsUser() {
+ return logfeederHdfsUser;
+ }
+
+ public void setLogfeederHdfsUser(String logfeederHdfsUser) {
+ this.logfeederHdfsUser = logfeederHdfsUser;
+ }
+
+ public boolean isHdfsKerberos() {
+ return hdfsKerberos;
+ }
+
+ public void setHdfsKerberos(boolean hdfsKerberos) {
+ this.hdfsKerberos = hdfsKerberos;
+ }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
index 31bfd0d..d383ed1 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
@@ -47,7 +47,7 @@
for (Input input : inputConfigHolder.getInputManager().getInputList(serviceName)) {
for (FilterDescriptor filterDescriptor : inputConfigHolder.getFilterConfigList()) {
if (filterDescriptor == null) {
- logger.warn("Filter descriptor is smpty. Skipping...");
+ logger.warn("Filter descriptor is empty. Skipping...");
continue;
}
if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) {
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
index ac10b2d..c2e73b7 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
@@ -52,7 +52,7 @@
final boolean useFilters = inputConfigHolder.getLogFeederProps().isCloudStorageUseFilters();
for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) {
if (inputDescriptor == null) {
- logger.warn("Input descriptor is smpty. Skipping...");
+ logger.warn("Input descriptor is empty. Skipping...");
continue;
}
LogFeederMode mode = inputConfigHolder.getLogFeederProps().getCloudStorageMode();
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
index dd0fe3e..4677461 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
@@ -74,7 +74,7 @@
private void loadInputs(String serviceName, InputConfigHolder inputConfigHolder) {
for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) {
if (inputDescriptor == null) {
- logger.warn("Input descriptor is smpty. Skipping...");
+ logger.warn("Input descriptor is empty. Skipping...");
continue;
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
index bd9e3df..ff0805d 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
@@ -46,7 +46,7 @@
Input input = inputMarker.getInput();
// Update the block with the context fields
for (Map.Entry<String, String> entry : input.getInputDescriptor().getAddFields().entrySet()) {
- if (jsonObj.get(entry.getKey()) == null || entry.getKey().equals("cluster") && "null".equals(jsonObj.get(entry.getKey()))) {
+ if (jsonObj.get(entry.getKey()) == null || "cluster".equals(entry.getKey()) && "null".equals(jsonObj.get(entry.getKey()))) {
jsonObj.put(entry.getKey(), entry.getValue());
}
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java
deleted file mode 100644
index a23a715..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.output.cloud.upload;
-
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.conf.output.ExternalHdfsOutputConfig;
-import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-/**
- * HDFS (on-prem) specific uploader client that can work with an external HDFS.
- */
-public class ExternalHDFSUploadClient implements UploadClient {
-
- private static final Logger logger = LogManager.getLogger(ExternalHDFSUploadClient.class);
-
- private final ExternalHdfsOutputConfig hdfsOutputConfig;
- private final FsPermission fsPermission;
- private FileSystem fs;
-
- public ExternalHDFSUploadClient(ExternalHdfsOutputConfig hdfsOutputConfig) {
- this.hdfsOutputConfig = hdfsOutputConfig;
- this.fsPermission = new FsPermission(hdfsOutputConfig.getHdfsFilePermissions());
- }
-
- @Override
- public void init(LogFeederProps logFeederProps) {
- logger.info("Initialize external HDFS client ...");
- if (StringUtils.isNotBlank(logFeederProps.getLogfeederHdfsUser())) {
- logger.info("Using HADOOP_USER_NAME: {}", logFeederProps.getLogfeederHdfsUser());
- System.setProperty("HADOOP_USER_NAME", logFeederProps.getLogfeederHdfsUser());
- }
- this.fs = LogFeederHDFSUtil.buildFileSystem(
- hdfsOutputConfig.getHdfsHost(),
- String.valueOf(hdfsOutputConfig.getHdfsPort()));
- if (logFeederProps.getHdfsOutputConfig().isSecure()) {
- logger.info("Kerberos is enabled for external HDFS.");
- Configuration conf = fs.getConf();
- conf.set("hadoop.security.authentication", "kerberos");
- }
- }
-
- @Override
- public void upload(String source, String target) throws Exception {
- LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, fsPermission);
- }
-
- @Override
- public void close() {
- LogFeederHDFSUtil.closeFileSystem(fs);
- }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
index c2a8497..421c4c5 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
@@ -19,15 +19,17 @@
package org.apache.ambari.logfeeder.output.cloud.upload;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.conf.output.HdfsOutputConfig;
import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.io.IOException;
-
/**
* HDFS client that uses core-site.xml file from the classpath to load the configuration.
* Can connect to S3 / GCS / WASB / ADLS if the core-site.xml is configured to use one of those cloud storages
@@ -35,37 +37,64 @@
public class HDFSUploadClient implements UploadClient {
private static final String FS_DEFAULT_FS = "fs.defaultFS";
+ private static final String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
private static final Logger logger = LogManager.getLogger(HDFSUploadClient.class);
+ private final boolean externalHdfs;
+ private final HdfsOutputConfig hdfsOutputConfig;
+ private final FsPermission fsPermission;
private FileSystem fs;
+ public HDFSUploadClient(HdfsOutputConfig hdfsOutputConfig, boolean externalHdfs) {
+ this.hdfsOutputConfig = hdfsOutputConfig;
+ this.externalHdfs = externalHdfs;
+ this.fsPermission = new FsPermission(hdfsOutputConfig.getHdfsFilePermissions());
+ }
+
@Override
public void init(LogFeederProps logFeederProps) {
- logger.info("Initialize HDFS client (cloud mode), using core-site.xml from the classpath.");
- Configuration configuration = new Configuration();
+ final Configuration configuration;
+ if (externalHdfs) {
+ configuration = LogFeederHDFSUtil.buildHdfsConfiguration(hdfsOutputConfig.getHdfsHost(), String.valueOf(hdfsOutputConfig.getHdfsPort()), "hdfs");
+ logger.info("Using external HDFS client as core-site.xml is not located on the classpath.");
+ } else {
+ configuration = new Configuration();
+ logger.info("Initialize HDFS client (cloud mode), using core-site.xml from the classpath.");
+ }
if (StringUtils.isNotBlank(logFeederProps.getCustomFs())) {
configuration.set(FS_DEFAULT_FS, logFeederProps.getCustomFs());
}
- if (StringUtils.isNotBlank(logFeederProps.getLogfeederHdfsUser()) && isHadoopFileSystem(configuration)) {
- logger.info("Using HADOOP_USER_NAME: {}", logFeederProps.getLogfeederHdfsUser());
- System.setProperty("HADOOP_USER_NAME", logFeederProps.getLogfeederHdfsUser());
+ if (hdfsOutputConfig.isHdfsKerberos()) {
+ logger.info("Kerberos is enabled for HDFS.");
+ configuration.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ final String principal = hdfsOutputConfig.getPrincipal().replace("_HOST", LogFeederUtil.hostName);
+ UserGroupInformation.setConfiguration(configuration);
+ try {
+ UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, hdfsOutputConfig.getKeytab());
+ UserGroupInformation.setLoginUser(ugi);
+ } catch (Exception e) {
+ logger.error("Error during kerberos login", e);
+ throw new RuntimeException(e);
+ }
+ } else {
+ if (StringUtils.isNotBlank(hdfsOutputConfig.getLogfeederHdfsUser())) {
+ logger.info("Using HADOOP_USER_NAME: {}", hdfsOutputConfig.getLogfeederHdfsUser());
+ System.setProperty("HADOOP_USER_NAME", hdfsOutputConfig.getLogfeederHdfsUser());
+ }
}
+ logger.info("HDFS client - will use '{}' permission for uploaded files", hdfsOutputConfig.getHdfsFilePermissions());
this.fs = LogFeederHDFSUtil.buildFileSystem(configuration);
}
@Override
public void upload(String source, String target) throws Exception {
- LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, null);
+ LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, this.fsPermission);
}
@Override
- public void close() throws IOException {
+ public void close() {
LogFeederHDFSUtil.closeFileSystem(fs);
}
- private boolean isHadoopFileSystem(Configuration conf) {
- return conf.get(FS_DEFAULT_FS).contains("hdfs://");
- }
-
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
index bea2943..27d69c7 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
@@ -43,11 +43,11 @@
if (useHdfsClient && checkCoreSiteIsOnClasspath(logFeederProps)) {
logger.info("The core-site.xml from the classpath will be used to figure it out the cloud output settings.");
logFeederProps.setCloudStorageDestination(CloudStorageDestination.DEFAULT_FS);
- return new HDFSUploadClient();
+ return new HDFSUploadClient(logFeederProps.getHdfsOutputConfig(), false);
}
else if (CloudStorageDestination.HDFS.equals(destType)) {
logger.info("External HDFS output will be used.");
- return new ExternalHDFSUploadClient(logFeederProps.getHdfsOutputConfig());
+ return new HDFSUploadClient(logFeederProps.getHdfsOutputConfig(), true);
} else if (CloudStorageDestination.S3.equals(destType)) {
if (useHdfsClient) {
logger.info("S3 cloud output will be used with HDFS client.");