blob: 3982b1af042973812e8712596ec7a5f5ceb51ca9 [file] [log] [blame]
/*
* Copyright 2019 WeBank
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.webank.wedatasphere.linkis.hadoop.common.utils
import java.io.File
import java.nio.file.Paths
import java.security.PrivilegedExceptionAction
import com.webank.wedatasphere.linkis.common.conf.Configuration.hadoopConfDir
import com.webank.wedatasphere.linkis.hadoop.common.conf.HadoopConf._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.security.UserGroupInformation
/**
* Created by enjoyyin on 2019/5/27.
*/
object HDFSUtils {
def getConfiguration(user: String): Configuration = getConfiguration(user, hadoopConfDir)
def getConfiguration(user: String, hadoopConfDir: String): Configuration = {
val confPath = new File(hadoopConfDir)
if(!confPath.exists() || confPath.isFile) {
throw new RuntimeException(s"Create hadoop configuration failed, path $hadoopConfDir not exists.")
}
val conf = new Configuration()
conf.addResource(new Path(Paths.get(hadoopConfDir, "core-site.xml").toAbsolutePath.toFile.getAbsolutePath))
conf.addResource(new Path(Paths.get(hadoopConfDir, "hdfs-site.xml").toAbsolutePath.toFile.getAbsolutePath))
conf.addResource(new Path(Paths.get(hadoopConfDir, "yarn-site.xml").toAbsolutePath.toFile.getAbsolutePath))
conf
}
def getHDFSRootUserFileSystem: FileSystem = getHDFSRootUserFileSystem(getConfiguration(HADOOP_ROOT_USER.getValue))
def getHDFSRootUserFileSystem(conf: org.apache.hadoop.conf.Configuration): FileSystem =
getHDFSUserFileSystem(HADOOP_ROOT_USER.getValue, conf)
def getHDFSUserFileSystem(userName: String): FileSystem = getHDFSUserFileSystem(userName, getConfiguration(userName))
def getHDFSUserFileSystem(userName: String, conf: org.apache.hadoop.conf.Configuration): FileSystem =
getUserGroupInformation(userName)
.doAs(new PrivilegedExceptionAction[FileSystem]{
def run = FileSystem.get(conf)
})
def getUserGroupInformation(userName: String): UserGroupInformation ={
if(KERBEROS_ENABLE.getValue) {
val path = new File(KEYTAB_FILE.getValue , userName + ".keytab").getPath
val user = getKerberosUser(userName)
UserGroupInformation.setConfiguration(getConfiguration(userName))
UserGroupInformation.loginUserFromKeytabAndReturnUGI(user, path)
} else {
UserGroupInformation.createRemoteUser(userName)
}
}
def getKerberosUser(userName: String): String = {
var user = userName
if(KEYTAB_HOST_ENABLED.getValue){
user = user+ "/" + KEYTAB_HOST.getValue
}
user
}
}