AMBARI-24878 - Infra Manager: kerberos support (#14)
diff --git a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
index 0118c76..ddc4f00 100644
--- a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
+++ b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
@@ -18,10 +18,16 @@
*/
package org.apache.ambari.infra;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import static org.apache.commons.lang.StringUtils.isBlank;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.commons.io.IOUtils;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -36,15 +42,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.commons.lang.StringUtils.isBlank;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
// TODO: use swagger
public class InfraClient implements AutoCloseable {
@@ -96,6 +97,12 @@
try {
String responseText = execute(new HttpPost(uriBuilder.build())).getBody();
Map<String, Object> responseContent = new ObjectMapper().readValue(responseText, new TypeReference<HashMap<String,Object>>() {});
+ if (!responseContent.containsKey("jobId"))
+ throw new NullPointerException("jobId is not found in start job responseContent");
+ if (!responseContent.containsKey("jobExecutionData"))
+ throw new NullPointerException("jobExecutionData is not found in start job responseContent");
+ if (!((Map)responseContent.get("jobExecutionData")).containsKey("id"))
+ throw new NullPointerException("id is not found in jobExecutionData");
return new JobExecutionInfo(responseContent.get("jobId").toString(), ((Map)responseContent.get("jobExecutionData")).get("id").toString());
} catch (URISyntaxException | JsonParseException | JsonMappingException e) {
throw new RuntimeException(e);
diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java
index 6a36f72..5c783d6 100644
--- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java
+++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java
@@ -54,6 +54,8 @@
@JsonSerialize(converter = FsPermissionToStringConverter.class)
@JsonDeserialize(converter = StringToFsPermissionConverter.class)
private FsPermission hdfsFilePermission;
+ private String hdfsKerberosPrincipal;
+ private String hdfsKerberosKeytabPath;
private String start;
private String end;
@JsonSerialize(converter = DurationToStringConverter.class)
@@ -172,6 +174,22 @@
this.hdfsFilePermission = hdfsFilePermission;
}
+ public String getHdfsKerberosPrincipal() {
+ return hdfsKerberosPrincipal;
+ }
+
+ public void setHdfsKerberosPrincipal(String hdfsKerberosPrincipal) {
+ this.hdfsKerberosPrincipal = hdfsKerberosPrincipal;
+ }
+
+ public String getHdfsKerberosKeytabPath() {
+ return hdfsKerberosKeytabPath;
+ }
+
+ public void setHdfsKerberosKeytabPath(String hdfsKerberosKeytabPath) {
+ this.hdfsKerberosKeytabPath = hdfsKerberosKeytabPath;
+ }
+
public Optional<S3Properties> s3Properties() {
if (isBlank(s3BucketName))
return Optional.empty();
@@ -183,6 +201,18 @@
s3Endpoint));
}
+ public Optional<HdfsProperties> hdfsProperties() {
+ if (isBlank(hdfsDestinationDirectory))
+ return Optional.empty();
+
+ return Optional.of(new HdfsProperties(
+ hdfsEndpoint,
+ hdfsDestinationDirectory,
+ hdfsFilePermission,
+ hdfsKerberosPrincipal,
+ hdfsKerberosKeytabPath));
+ }
+
public String getStart() {
return start;
}
@@ -234,12 +264,9 @@
break;
case HDFS:
- if (isBlank(hdfsEndpoint))
- throw new IllegalArgumentException(String.format(
- "The property hdfsEndpoint can not be null or empty string when destination is set to %s!", HDFS.name()));
- if (isBlank(hdfsDestinationDirectory))
- throw new IllegalArgumentException(String.format(
- "The property hdfsDestinationDirectory can not be null or empty string when destination is set to %s!", HDFS.name()));
+ hdfsProperties()
+ .orElseThrow(() -> new IllegalArgumentException("HDFS related properties must be set if the destination is " + HDFS.name()))
+ .validate();
}
requireNonNull(solr, "No solr query was specified for archiving job!");
diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
index 85fb364..af522d3 100644
--- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
+++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
@@ -32,7 +32,6 @@
import org.apache.ambari.infra.job.JobContextRepository;
import org.apache.ambari.infra.job.JobScheduler;
import org.apache.ambari.infra.job.ObjectSource;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
@@ -103,8 +102,8 @@
break;
case HDFS:
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
- conf.set("fs.defaultFS", parameters.getHdfsEndpoint());
- fileAction.add(new HdfsUploader(conf, new Path(parameters.getHdfsDestinationDirectory()), parameters.getHdfsFilePermission()));
+ fileAction.add(new HdfsUploader(conf,
+ parameters.hdfsProperties().orElseThrow(() -> new IllegalStateException("HDFS properties are not provided!"))));
break;
case LOCAL:
baseDir = new File(parameters.getLocalDestinationDirectory());
diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java
index a573562..8ad576c 100644
--- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java
+++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java
@@ -23,7 +23,6 @@
import static org.apache.commons.lang.StringUtils.isBlank;
import java.time.Duration;
-import java.util.Optional;
import org.apache.ambari.infra.job.JobProperties;
import org.apache.ambari.infra.json.DurationToStringConverter;
@@ -40,6 +39,7 @@
private String fileNameSuffixDateFormat;
private Duration ttl;
private SolrProperties solr;
+
private String s3AccessFile;
private String s3KeyPrefix;
private String s3BucketName;
@@ -48,6 +48,8 @@
private String hdfsEndpoint;
private String hdfsDestinationDirectory;
private FsPermission hdfsFilePermission;
+ private String hdfsKerberosPrincipal;
+ private String hdfsKerberosKeytabPath;
public int getReadBlockSize() {
return readBlockSize;
@@ -145,17 +147,6 @@
this.s3Endpoint = s3Endpoint;
}
- public Optional<S3Properties> s3Properties() {
- if (isBlank(s3BucketName))
- return Optional.empty();
-
- return Optional.of(new S3Properties(
- s3AccessFile,
- s3KeyPrefix,
- s3BucketName,
- s3Endpoint));
- }
-
public String getHdfsEndpoint() {
return hdfsEndpoint;
}
@@ -180,6 +171,22 @@
this.hdfsDestinationDirectory = hdfsDestinationDirectory;
}
+ public String getHdfsKerberosPrincipal() {
+ return hdfsKerberosPrincipal;
+ }
+
+ public void setHdfsKerberosPrincipal(String hdfsKerberosPrincipal) {
+ this.hdfsKerberosPrincipal = hdfsKerberosPrincipal;
+ }
+
+ public String getHdfsKerberosKeytabPath() {
+ return hdfsKerberosKeytabPath;
+ }
+
+ public void setHdfsKerberosKeytabPath(String hdfsKerberosKeytabPath) {
+ this.hdfsKerberosKeytabPath = hdfsKerberosKeytabPath;
+ }
+
private int getIntJobParameter(JobParameters jobParameters, String parameterName, int defaultValue) {
String valueText = jobParameters.getString(parameterName);
if (isBlank(valueText))
@@ -203,6 +210,8 @@
archivingParameters.setHdfsEndpoint(jobParameters.getString("hdfsEndpoint", hdfsEndpoint));
archivingParameters.setHdfsDestinationDirectory(jobParameters.getString("hdfsDestinationDirectory", hdfsDestinationDirectory));
archivingParameters.setHdfsFilePermission(toFsPermission(jobParameters.getString("hdfsFilePermission", FsPermissionToStringConverter.toString(hdfsFilePermission))));
+ archivingParameters.setHdfsKerberosPrincipal(jobParameters.getString("hdfsKerberosPrincipal", hdfsKerberosPrincipal));
+ archivingParameters.setHdfsKerberosKeytabPath(jobParameters.getString("hdfsKerberosKeytabPath", hdfsKerberosKeytabPath));
archivingParameters.setSolr(solr.merge(jobParameters));
archivingParameters.setStart(jobParameters.getString("start"));
archivingParameters.setEnd(jobParameters.getString("end"));
diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsProperties.java
new file mode 100644
index 0000000..da4137f
--- /dev/null
+++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsProperties.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import static org.apache.commons.lang.StringUtils.isBlank;
+import static org.apache.commons.lang.StringUtils.isNotBlank;
+
+import org.apache.hadoop.fs.permission.FsPermission;
+
+public class HdfsProperties {
+ private static final String DEFAULT_FILE_PERMISSION = "640";
+
+ private final String hdfsEndpoint;
+ private final String hdfsDestinationDirectory;
+ private final FsPermission hdfsFilePermission;
+ private final String hdfsKerberosPrincipal;
+ private final String hdfsKerberosKeytabPath;
+
+ public HdfsProperties(String hdfsEndpoint, String hdfsDestinationDirectory, FsPermission hdfsFilePermission, String hdfsKerberosPrincipal, String hdfsKerberosKeytabPath) {
+ this.hdfsEndpoint = hdfsEndpoint;
+ this.hdfsDestinationDirectory = hdfsDestinationDirectory;
+ this.hdfsFilePermission = hdfsFilePermission == null ? new FsPermission(DEFAULT_FILE_PERMISSION) : hdfsFilePermission;
+ this.hdfsKerberosPrincipal = hdfsKerberosPrincipal;
+ this.hdfsKerberosKeytabPath = hdfsKerberosKeytabPath;
+ }
+
+ public String getHdfsEndpoint() {
+ return hdfsEndpoint;
+ }
+
+ public String getHdfsDestinationDirectory() {
+ return hdfsDestinationDirectory;
+ }
+
+ public FsPermission getHdfsFilePermission() {
+ return hdfsFilePermission;
+ }
+
+ public String getHdfsKerberosPrincipal() {
+ return hdfsKerberosPrincipal;
+ }
+
+ public String getHdfsKerberosKeytabPath() {
+ return hdfsKerberosKeytabPath;
+ }
+
+ @Override
+ public String toString() {
+ return "HdfsProperties{" +
+ "hdfsEndpoint='" + hdfsEndpoint + '\'' +
+ ", hdfsDestinationDirectory='" + hdfsDestinationDirectory + '\'' +
+ ", hdfsFilePermission=" + hdfsFilePermission +
+ ", hdfsKerberosPrincipal='" + hdfsKerberosPrincipal + '\'' +
+ ", hdfsKerberosKeytabPath='" + hdfsKerberosKeytabPath + '\'' +
+ '}';
+ }
+
+ public void validate() {
+ if (isBlank(hdfsDestinationDirectory))
+ throw new IllegalArgumentException("The property hdfsDestinationDirectory can not be null or empty string!");
+
+ if (isNotBlank(hdfsKerberosPrincipal) && isBlank(hdfsKerberosKeytabPath))
+ throw new IllegalArgumentException("The property hdfsKerberosPrincipal is specified but hdfsKerberosKeytabPath is blank!");
+
+ if (isBlank(hdfsKerberosPrincipal) && isNotBlank(hdfsKerberosKeytabPath))
+ throw new IllegalArgumentException("The property hdfsKerberosKeytabPath is specified but hdfsKerberosPrincipal is blank!");
+ }
+}
diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsUploader.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsUploader.java
index 469326f..ff48673 100644
--- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsUploader.java
+++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsUploader.java
@@ -18,6 +18,8 @@
*/
package org.apache.ambari.infra.job.archive;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -25,31 +27,59 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.ClassPathResource;
public class HdfsUploader extends AbstractFileAction {
+ private static final Logger LOG = LoggerFactory.getLogger(HdfsUploader.class);
- private static final String DEFAULT_FILE_PERMISSION = "640";
private final Configuration configuration;
- private final Path destinationDirectory;
- private final FsPermission fsPermission;
+ private final HdfsProperties properties;
- public HdfsUploader(Configuration configuration, Path destinationDirectory, FsPermission fsPermission) {
- this.destinationDirectory = destinationDirectory;
+ public HdfsUploader(Configuration configuration, HdfsProperties properties) {
+ this.properties = properties;
this.configuration = configuration;
- this.fsPermission = fsPermission == null ? new FsPermission(DEFAULT_FILE_PERMISSION) : fsPermission;
+
+ if (new ClassPathResource("core-site.xml").exists()) {
+ LOG.info("Hdfs core-site.xml is found in the classpath.");
+ }
+ else {
+ LOG.warn("Hdfs core-site.xml is not found in the classpath. Using defaults.");
+ }
+ if (new ClassPathResource("hdfs-site.xml").exists()) {
+ LOG.info("Hdfs hdfs-site.xml is found in the classpath.");
+ }
+ else {
+ LOG.warn("Hdfs hdfs-site.xml is not found in the classpath. Using defaults.");
+ }
+ if (isNotBlank(properties.getHdfsEndpoint())) {
+ LOG.info("Hdfs endpoint is defined in Infra Manager properties. Setting fs.defaultFS to {}", properties.getHdfsEndpoint());
+ this.configuration.set("fs.defaultFS", properties.getHdfsEndpoint());
+ }
+
+ UserGroupInformation.setConfiguration(configuration);
}
@Override
protected File onPerform(File inputFile) {
+ try {
+ if ("kerberos".equalsIgnoreCase(configuration.get("hadoop.security.authentication")))
+ UserGroupInformation.loginUserFromKeytab(properties.getHdfsKerberosPrincipal(), properties.getHdfsKerberosKeytabPath());
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+
try (FileSystem fileSystem = FileSystem.get(configuration)) {
- Path destination = new Path(destinationDirectory, inputFile.getName());
+
+ Path destination = new Path(properties.getHdfsDestinationDirectory(), inputFile.getName());
if (fileSystem.exists(destination)) {
throw new UnsupportedOperationException(String.format("File '%s' already exists!", destination));
}
fileSystem.copyFromLocalFile(new Path(inputFile.getAbsolutePath()), destination);
- fileSystem.setPermission(destination, fsPermission);
+ fileSystem.setPermission(destination, properties.getHdfsFilePermission());
return inputFile;
}
diff --git a/pom.xml b/pom.xml
index 29271c1..b6f52f9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,7 +31,7 @@
<deb.python.ver>python (>= 2.6)</deb.python.ver>
<deb.architecture>amd64</deb.architecture>
<deb.dependency.list>${deb.python.ver}</deb.dependency.list>
- <hadoop.version>3.0.0</hadoop.version>
+ <hadoop.version>3.1.1</hadoop.version>
<surefire.argLine>-Xmx1024m -Xms512m</surefire.argLine>
<zookeeper.version>3.4.6.2.3.0.0-2557</zookeeper.version>
<ambari-metrics.version>2.7.0.0.0</ambari-metrics.version>