AMBARI-24878 - Infra Manager: kerberos support (#14)

diff --git a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
index 0118c76..ddc4f00 100644
--- a/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
+++ b/ambari-infra-manager-it/src/test/java/org/apache/ambari/infra/InfraClient.java
@@ -18,10 +18,16 @@
  */
 package org.apache.ambari.infra;
 
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import static org.apache.commons.lang.StringUtils.isBlank;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.http.client.ClientProtocolException;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -36,15 +42,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.commons.lang.StringUtils.isBlank;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 
 // TODO: use swagger
 public class InfraClient implements AutoCloseable {
@@ -96,6 +97,12 @@
     try {
       String responseText = execute(new HttpPost(uriBuilder.build())).getBody();
       Map<String, Object> responseContent = new ObjectMapper().readValue(responseText, new TypeReference<HashMap<String,Object>>() {});
+      if (!responseContent.containsKey("jobId"))
+        throw new NullPointerException("jobId is not found in start job responseContent");
+      if (!responseContent.containsKey("jobExecutionData"))
+        throw new NullPointerException("jobExecutionData is not found in start job responseContent");
+      if (!((Map)responseContent.get("jobExecutionData")).containsKey("id"))
+        throw new NullPointerException("id is not found in jobExecutionData");
       return new JobExecutionInfo(responseContent.get("jobId").toString(), ((Map)responseContent.get("jobExecutionData")).get("id").toString());
     } catch (URISyntaxException | JsonParseException | JsonMappingException e) {
       throw new RuntimeException(e);
diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java
index 6a36f72..5c783d6 100644
--- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java
+++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/ArchivingParameters.java
@@ -54,6 +54,8 @@
   @JsonSerialize(converter = FsPermissionToStringConverter.class)
   @JsonDeserialize(converter = StringToFsPermissionConverter.class)
   private FsPermission hdfsFilePermission;
+  private String hdfsKerberosPrincipal;
+  private String hdfsKerberosKeytabPath;
   private String start;
   private String end;
   @JsonSerialize(converter = DurationToStringConverter.class)
@@ -172,6 +174,22 @@
     this.hdfsFilePermission = hdfsFilePermission;
   }
 
+  public String getHdfsKerberosPrincipal() {
+    return hdfsKerberosPrincipal;
+  }
+
+  public void setHdfsKerberosPrincipal(String hdfsKerberosPrincipal) {
+    this.hdfsKerberosPrincipal = hdfsKerberosPrincipal;
+  }
+
+  public String getHdfsKerberosKeytabPath() {
+    return hdfsKerberosKeytabPath;
+  }
+
+  public void setHdfsKerberosKeytabPath(String hdfsKerberosKeytabPath) {
+    this.hdfsKerberosKeytabPath = hdfsKerberosKeytabPath;
+  }
+
   public Optional<S3Properties> s3Properties() {
     if (isBlank(s3BucketName))
       return Optional.empty();
@@ -183,6 +201,18 @@
             s3Endpoint));
   }
 
+  public Optional<HdfsProperties> hdfsProperties() {
+    if (isBlank(hdfsDestinationDirectory))
+      return Optional.empty();
+
+    return Optional.of(new HdfsProperties(
+            hdfsEndpoint,
+            hdfsDestinationDirectory,
+            hdfsFilePermission,
+            hdfsKerberosPrincipal,
+            hdfsKerberosKeytabPath));
+  }
+
   public String getStart() {
     return start;
   }
@@ -234,12 +264,9 @@
         break;
 
       case HDFS:
-        if (isBlank(hdfsEndpoint))
-          throw new IllegalArgumentException(String.format(
-                  "The property hdfsEndpoint can not be null or empty string when destination is set to %s!", HDFS.name()));
-        if (isBlank(hdfsDestinationDirectory))
-          throw new IllegalArgumentException(String.format(
-                  "The property hdfsDestinationDirectory can not be null or empty string when destination is set to %s!", HDFS.name()));
+        hdfsProperties()
+                .orElseThrow(() -> new IllegalArgumentException("HDFS related properties must be set if the destination is " + HDFS.name()))
+                .validate();
     }
 
     requireNonNull(solr, "No solr query was specified for archiving job!");
diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
index 85fb364..af522d3 100644
--- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
+++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingConfiguration.java
@@ -32,7 +32,6 @@
 import org.apache.ambari.infra.job.JobContextRepository;
 import org.apache.ambari.infra.job.JobScheduler;
 import org.apache.ambari.infra.job.ObjectSource;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.batch.core.Job;
@@ -103,8 +102,8 @@
         break;
       case HDFS:
         org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
-        conf.set("fs.defaultFS", parameters.getHdfsEndpoint());
-        fileAction.add(new HdfsUploader(conf, new Path(parameters.getHdfsDestinationDirectory()), parameters.getHdfsFilePermission()));
+        fileAction.add(new HdfsUploader(conf,
+                parameters.hdfsProperties().orElseThrow(() -> new IllegalStateException("HDFS properties are not provided!"))));
         break;
       case LOCAL:
         baseDir = new File(parameters.getLocalDestinationDirectory());
diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java
index a573562..8ad576c 100644
--- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java
+++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/DocumentArchivingProperties.java
@@ -23,7 +23,6 @@
 import static org.apache.commons.lang.StringUtils.isBlank;
 
 import java.time.Duration;
-import java.util.Optional;
 
 import org.apache.ambari.infra.job.JobProperties;
 import org.apache.ambari.infra.json.DurationToStringConverter;
@@ -40,6 +39,7 @@
   private String fileNameSuffixDateFormat;
   private Duration ttl;
   private SolrProperties solr;
+
   private String s3AccessFile;
   private String s3KeyPrefix;
   private String s3BucketName;
@@ -48,6 +48,8 @@
   private String hdfsEndpoint;
   private String hdfsDestinationDirectory;
   private FsPermission hdfsFilePermission;
+  private String hdfsKerberosPrincipal;
+  private String hdfsKerberosKeytabPath;
 
   public int getReadBlockSize() {
     return readBlockSize;
@@ -145,17 +147,6 @@
     this.s3Endpoint = s3Endpoint;
   }
 
-  public Optional<S3Properties> s3Properties() {
-    if (isBlank(s3BucketName))
-      return Optional.empty();
-
-    return Optional.of(new S3Properties(
-            s3AccessFile,
-            s3KeyPrefix,
-            s3BucketName,
-            s3Endpoint));
-  }
-
   public String getHdfsEndpoint() {
     return hdfsEndpoint;
   }
@@ -180,6 +171,22 @@
     this.hdfsDestinationDirectory = hdfsDestinationDirectory;
   }
 
+  public String getHdfsKerberosPrincipal() {
+    return hdfsKerberosPrincipal;
+  }
+
+  public void setHdfsKerberosPrincipal(String hdfsKerberosPrincipal) {
+    this.hdfsKerberosPrincipal = hdfsKerberosPrincipal;
+  }
+
+  public String getHdfsKerberosKeytabPath() {
+    return hdfsKerberosKeytabPath;
+  }
+
+  public void setHdfsKerberosKeytabPath(String hdfsKerberosKeytabPath) {
+    this.hdfsKerberosKeytabPath = hdfsKerberosKeytabPath;
+  }
+
   private int getIntJobParameter(JobParameters jobParameters, String parameterName, int defaultValue) {
     String valueText = jobParameters.getString(parameterName);
     if (isBlank(valueText))
@@ -203,6 +210,8 @@
     archivingParameters.setHdfsEndpoint(jobParameters.getString("hdfsEndpoint", hdfsEndpoint));
     archivingParameters.setHdfsDestinationDirectory(jobParameters.getString("hdfsDestinationDirectory", hdfsDestinationDirectory));
     archivingParameters.setHdfsFilePermission(toFsPermission(jobParameters.getString("hdfsFilePermission", FsPermissionToStringConverter.toString(hdfsFilePermission))));
+    archivingParameters.setHdfsKerberosPrincipal(jobParameters.getString("hdfsKerberosPrincipal", hdfsKerberosPrincipal));
+    archivingParameters.setHdfsKerberosKeytabPath(jobParameters.getString("hdfsKerberosKeytabPath", hdfsKerberosKeytabPath));
     archivingParameters.setSolr(solr.merge(jobParameters));
     archivingParameters.setStart(jobParameters.getString("start"));
     archivingParameters.setEnd(jobParameters.getString("end"));
diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsProperties.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsProperties.java
new file mode 100644
index 0000000..da4137f
--- /dev/null
+++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsProperties.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.infra.job.archive;
+
+import static org.apache.commons.lang.StringUtils.isBlank;
+import static org.apache.commons.lang.StringUtils.isNotBlank;
+
+import org.apache.hadoop.fs.permission.FsPermission;
+
+public class HdfsProperties {
+  private static final String DEFAULT_FILE_PERMISSION = "640";
+
+  private final String hdfsEndpoint;
+  private final String hdfsDestinationDirectory;
+  private final FsPermission hdfsFilePermission;
+  private final String hdfsKerberosPrincipal;
+  private final String hdfsKerberosKeytabPath;
+
+  public HdfsProperties(String hdfsEndpoint, String hdfsDestinationDirectory, FsPermission hdfsFilePermission, String hdfsKerberosPrincipal, String hdfsKerberosKeytabPath) {
+    this.hdfsEndpoint = hdfsEndpoint;
+    this.hdfsDestinationDirectory = hdfsDestinationDirectory;
+    this.hdfsFilePermission = hdfsFilePermission == null ? new FsPermission(DEFAULT_FILE_PERMISSION) : hdfsFilePermission;
+    this.hdfsKerberosPrincipal = hdfsKerberosPrincipal;
+    this.hdfsKerberosKeytabPath = hdfsKerberosKeytabPath;
+  }
+
+  public String getHdfsEndpoint() {
+    return hdfsEndpoint;
+  }
+
+  public String getHdfsDestinationDirectory() {
+    return hdfsDestinationDirectory;
+  }
+
+  public FsPermission getHdfsFilePermission() {
+    return hdfsFilePermission;
+  }
+
+  public String getHdfsKerberosPrincipal() {
+    return hdfsKerberosPrincipal;
+  }
+
+  public String getHdfsKerberosKeytabPath() {
+    return hdfsKerberosKeytabPath;
+  }
+
+  @Override
+  public String toString() {
+    return "HdfsProperties{" +
+            "hdfsEndpoint='" + hdfsEndpoint + '\'' +
+            ", hdfsDestinationDirectory='" + hdfsDestinationDirectory + '\'' +
+            ", hdfsFilePermission=" + hdfsFilePermission +
+            ", hdfsKerberosPrincipal='" + hdfsKerberosPrincipal + '\'' +
+            ", hdfsKerberosKeytabPath='" + hdfsKerberosKeytabPath + '\'' +
+            '}';
+  }
+
+  public void validate() {
+    if (isBlank(hdfsDestinationDirectory))
+      throw new IllegalArgumentException("The property hdfsDestinationDirectory can not be null or empty string!");
+
+    if (isNotBlank(hdfsKerberosPrincipal) && isBlank(hdfsKerberosKeytabPath))
+      throw new IllegalArgumentException("The property hdfsKerberosPrincipal is specified but hdfsKerberosKeytabPath is blank!");
+
+    if (isBlank(hdfsKerberosPrincipal) && isNotBlank(hdfsKerberosKeytabPath))
+      throw new IllegalArgumentException("The property hdfsKerberosKeytabPath is specified but hdfsKerberosPrincipal is blank!");
+  }
+}
diff --git a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsUploader.java b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsUploader.java
index 469326f..ff48673 100644
--- a/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsUploader.java
+++ b/ambari-infra-manager/src/main/java/org/apache/ambari/infra/job/archive/HdfsUploader.java
@@ -18,6 +18,8 @@
  */
 package org.apache.ambari.infra.job.archive;
 
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -25,31 +27,59 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.ClassPathResource;
 
 public class HdfsUploader extends AbstractFileAction {
+  private static final Logger LOG = LoggerFactory.getLogger(HdfsUploader.class);
 
-  private static final String DEFAULT_FILE_PERMISSION = "640";
   private final Configuration configuration;
-  private final Path destinationDirectory;
-  private final FsPermission fsPermission;
+  private final HdfsProperties properties;
 
-  public HdfsUploader(Configuration configuration, Path destinationDirectory, FsPermission fsPermission) {
-    this.destinationDirectory = destinationDirectory;
+  public HdfsUploader(Configuration configuration, HdfsProperties properties) {
+    this.properties = properties;
     this.configuration = configuration;
-    this.fsPermission = fsPermission == null ? new FsPermission(DEFAULT_FILE_PERMISSION) : fsPermission;
+
+    if (new ClassPathResource("core-site.xml").exists()) {
+      LOG.info("Hdfs core-site.xml is found in the classpath.");
+    }
+    else {
+      LOG.warn("Hdfs core-site.xml is not found in the classpath. Using defaults.");
+    }
+    if (new ClassPathResource("hdfs-site.xml").exists()) {
+      LOG.info("Hdfs hdfs-site.xml is found in the classpath.");
+    }
+    else {
+      LOG.warn("Hdfs hdfs-site.xml is not found in the classpath. Using defaults.");
+    }
+    if (isNotBlank(properties.getHdfsEndpoint())) {
+      LOG.info("Hdfs endpoint is defined in Infra Manager properties. Setting fs.defaultFS to {}", properties.getHdfsEndpoint());
+      this.configuration.set("fs.defaultFS", properties.getHdfsEndpoint());
+    }
+
+    UserGroupInformation.setConfiguration(configuration);
   }
 
   @Override
   protected File onPerform(File inputFile) {
+    try {
+      if ("kerberos".equalsIgnoreCase(configuration.get("hadoop.security.authentication")))
+        UserGroupInformation.loginUserFromKeytab(properties.getHdfsKerberosPrincipal(), properties.getHdfsKerberosKeytabPath());
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+
     try (FileSystem fileSystem = FileSystem.get(configuration)) {
-      Path destination = new Path(destinationDirectory, inputFile.getName());
+
+      Path destination = new Path(properties.getHdfsDestinationDirectory(), inputFile.getName());
       if (fileSystem.exists(destination)) {
         throw new UnsupportedOperationException(String.format("File '%s' already exists!", destination));
       }
 
       fileSystem.copyFromLocalFile(new Path(inputFile.getAbsolutePath()), destination);
-      fileSystem.setPermission(destination, fsPermission);
+      fileSystem.setPermission(destination, properties.getHdfsFilePermission());
 
       return inputFile;
     }
diff --git a/pom.xml b/pom.xml
index 29271c1..b6f52f9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -31,7 +31,7 @@
     <deb.python.ver>python (&gt;= 2.6)</deb.python.ver>
     <deb.architecture>amd64</deb.architecture>
     <deb.dependency.list>${deb.python.ver}</deb.dependency.list>
-    <hadoop.version>3.0.0</hadoop.version>
+    <hadoop.version>3.1.1</hadoop.version>
     <surefire.argLine>-Xmx1024m -Xms512m</surefire.argLine>
     <zookeeper.version>3.4.6.2.3.0.0-2557</zookeeper.version>
     <ambari-metrics.version>2.7.0.0.0</ambari-metrics.version>