blob: 9d2399207ec86e7a903a5a9241ca97b2798c5953 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.security.modules;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.Subject;
import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collection;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** Responsible for installing a Hadoop login user. */
public class HadoopModule implements SecurityModule {
private static final Logger LOG = LoggerFactory.getLogger(HadoopModule.class);
private final SecurityConfiguration securityConfig;
private final Configuration hadoopConfiguration;
public HadoopModule(
SecurityConfiguration securityConfiguration, Configuration hadoopConfiguration) {
this.securityConfig = checkNotNull(securityConfiguration);
this.hadoopConfiguration = checkNotNull(hadoopConfiguration);
}
@VisibleForTesting
public SecurityConfiguration getSecurityConfig() {
return securityConfig;
}
@Override
public void install() throws SecurityInstallException {
UserGroupInformation.setConfiguration(hadoopConfiguration);
UserGroupInformation loginUser;
try {
if (UserGroupInformation.isSecurityEnabled()
&& !StringUtils.isBlank(securityConfig.getKeytab())
&& !StringUtils.isBlank(securityConfig.getPrincipal())) {
String keytabPath = (new File(securityConfig.getKeytab())).getAbsolutePath();
UserGroupInformation.loginUserFromKeytab(securityConfig.getPrincipal(), keytabPath);
loginUser = UserGroupInformation.getLoginUser();
// supplement with any available tokens
String fileLocation =
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
if (fileLocation != null) {
Credentials credentialsFromTokenStorageFile =
Credentials.readTokenStorageFile(
new File(fileLocation), hadoopConfiguration);
// if UGI uses Kerberos keytabs for login, do not load HDFS delegation token
// since
// the UGI would prefer the delegation token instead, which eventually expires
// and does not fallback to using Kerberos tickets
Credentials credentialsToBeAdded = new Credentials();
final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN");
Collection<Token<? extends TokenIdentifier>> usrTok =
credentialsFromTokenStorageFile.getAllTokens();
// If UGI use keytab for login, do not load HDFS delegation token.
for (Token<? extends TokenIdentifier> token : usrTok) {
if (!token.getKind().equals(hdfsDelegationTokenKind)) {
credentialsToBeAdded.addToken(token.getService(), token);
}
}
loginUser.addCredentials(credentialsToBeAdded);
}
} else {
// login with current user credentials (e.g. ticket cache, OS login)
// note that the stored tokens are read automatically
try {
// Use reflection API to get the login user object
// UserGroupInformation.loginUserFromSubject(null);
Method loginUserFromSubjectMethod =
UserGroupInformation.class.getMethod(
"loginUserFromSubject", Subject.class);
loginUserFromSubjectMethod.invoke(null, (Subject) null);
} catch (NoSuchMethodException e) {
LOG.warn("Could not find method implementations in the shaded jar.", e);
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
loginUser = UserGroupInformation.getLoginUser();
}
LOG.info("Hadoop user set to {}", loginUser);
if (HadoopUtils.isKerberosSecurityEnabled(loginUser)) {
boolean isCredentialsConfigured =
HadoopUtils.areKerberosCredentialsValid(
loginUser, securityConfig.useTicketCache());
LOG.info(
"Kerberos security is enabled and credentials are {}.",
isCredentialsConfigured ? "valid" : "invalid");
}
} catch (Throwable ex) {
throw new SecurityInstallException("Unable to set the Hadoop login user", ex);
}
}
@Override
public void uninstall() {
throw new UnsupportedOperationException();
}
}