blob: c892b10e2f35d430fafa50f8b6ece24fc30d8ee7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.services.security;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.util.Time;
import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.server.appmaster.SliderAppMaster;
import org.apache.slider.server.appmaster.actions.AsyncAction;
import org.apache.slider.server.appmaster.actions.QueueAccess;
import org.apache.slider.server.appmaster.actions.RenewingAction;
import org.apache.slider.server.appmaster.state.AppState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.security.PrivilegedExceptionAction;
import java.text.DateFormat;
import java.util.Date;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
*
*/
public class FsDelegationTokenManager {
private final QueueAccess queue;
private RenewingAction<RenewAction> renewingAction;
private UserGroupInformation remoteUser;
private UserGroupInformation currentUser;
private static final Logger
log = LoggerFactory.getLogger(FsDelegationTokenManager.class);
private long renewInterval;
private RenewAction renewAction;
private String tokenName;
public FsDelegationTokenManager(QueueAccess queue) throws IOException {
this.queue = queue;
this.currentUser = UserGroupInformation.getCurrentUser();
}
private void createRemoteUser(Configuration configuration) throws IOException {
Configuration loginConfig = new Configuration(configuration);
loginConfig.set(DFSConfigKeys.HADOOP_SECURITY_AUTHENTICATION,
"kerberos");
// using HDFS principal...
this.remoteUser = UserGroupInformation
.loginUserFromKeytabAndReturnUGI(
SecurityUtil.getServerPrincipal(
loginConfig.get(DFSConfigKeys.DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY),
InetAddress.getLocalHost().getCanonicalHostName()),
loginConfig.get(DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY));
log.info("Created remote user {}. UGI reports current user is {}",
this.remoteUser, UserGroupInformation.getCurrentUser());
}
public void acquireDelegationToken(Configuration configuration)
throws IOException, InterruptedException {
if (remoteUser == null) {
createRemoteUser(configuration);
}
if (SliderUtils.isHadoopClusterSecure(configuration) &&
renewingAction == null) {
renewInterval = configuration.getLong(
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
// constructor of action will retrieve initial token. One may already be
// associated with user, but its lifecycle/management is not clear so let's
// create and manage a token explicitly
renewAction = new RenewAction("HDFS renew",
configuration);
// set retrieved token as the user associated delegation token and
// start a renewing action to renew
Token<?> token = renewAction.getToken();
currentUser.addToken(token.getService(), token);
log.info("HDFS delegation token {} acquired and set as credential for current user", token);
renewingAction = new RenewingAction<RenewAction>(renewAction,
(int) renewInterval,
(int) renewInterval,
TimeUnit.MILLISECONDS,
getRenewingLimit());
log.info("queuing HDFS delegation token renewal interval of {} milliseconds",
renewInterval);
queue(renewingAction);
}
}
public void cancelDelegationToken(Configuration configuration)
throws IOException, InterruptedException {
queue.removeRenewingAction(getRenewingActionName());
if (renewAction != null) {
renewAction.getToken().cancel(configuration);
}
log.info("Renewing action {} removed and HDFS delegation token renewal "
+ "cancelled", getRenewingActionName());
}
protected int getRenewingLimit() {
return 0;
}
protected void queue(RenewingAction<RenewAction> action) {
queue.renewing(getRenewingActionName(),
action);
}
protected String getRenewingActionName() {
if (tokenName == null) {
tokenName = "HDFS renewing token " + UUID.randomUUID();
}
return tokenName;
}
class RenewAction extends AsyncAction {
Configuration configuration;
Token<?> token;
private long tokenExpiryTime;
private final FileSystem fs;
RenewAction(String name,
Configuration configuration)
throws IOException, InterruptedException {
super(name);
this.configuration = configuration;
fs = getFileSystem();
// get initial token by creating a kerberos authenticated user and
// invoking token methods as that user
synchronized (fs) {
this.token = remoteUser.doAs(new PrivilegedExceptionAction<Token<?>>() {
@Override
public Token<?> run() throws Exception {
log.info("Obtaining HDFS delgation token with user {}",
remoteUser.getShortUserName());
Token token = fs.getDelegationToken(
remoteUser.getShortUserName());
tokenExpiryTime = getTokenExpiryTime(token);
log.info("Initial delegation token obtained with expiry time of {}", getPrintableExirationTime(tokenExpiryTime));
return token;
}
});
}
log.info("Initial request returned delegation token {}", token);
}
private long getTokenExpiryTime(Token token) throws IOException {
AbstractDelegationTokenIdentifier id =
(AbstractDelegationTokenIdentifier)token.decodeIdentifier();
return id.getMaxDate();
}
protected FileSystem getFileSystem()
throws IOException, InterruptedException {
// return non-cache FS reference
return remoteUser.doAs(new PrivilegedExceptionAction<FileSystem>() {
@Override
public FileSystem run() throws Exception {
Configuration config = new Configuration(configuration);
config.setBoolean("fs.hdfs.impl.disable.cache", true);
return getRemoteFileSystemForRenewal(config);
}
});
}
@Override
public void execute(SliderAppMaster appMaster, QueueAccess queueService,
AppState appState)
throws Exception {
if (fs != null) {
synchronized(fs) {
try {
long expires = remoteUser.doAs(new PrivilegedExceptionAction<Long>() {
@Override
public Long run() throws Exception {
long expires = token.renew(fs.getConf());
log.info("HDFS delegation token renewed. Renewal cycle ends at {}",
getPrintableExirationTime(expires));
return expires;
}
});
long calculatedInterval = tokenExpiryTime - Time.now();
if ( calculatedInterval < renewInterval ) {
// time to get a new token since the token will expire before
// next renewal interval. Could modify this to be closer to expiry
// time if deemed necessary....
log.info("Interval of {} less than renew interval. Getting new token",
calculatedInterval);
getNewToken();
} else {
updateRenewalTime(renewInterval);
}
} catch (IOException ie) {
// token has expired. get a new one...
log.info("Exception raised by renew", ie);
getNewToken();
}
}
}
}
private String getPrintableExirationTime(long expires) {
Date d = new Date(expires);
return DateFormat.getDateTimeInstance().format(d);
}
private void getNewToken()
throws InterruptedException, IOException {
try {
Text service = token.getService();
Token<?>[] tokens = remoteUser.doAs(new PrivilegedExceptionAction<Token<?>[]>() {
@Override
public Token<?>[] run() throws Exception {
return fs.addDelegationTokens(remoteUser.getShortUserName(), null);
}
});
if (tokens.length == 0) {
throw new IOException("addDelegationTokens returned no tokens");
}
token = findMatchingToken(service, tokens);
currentUser.addToken(token.getService(), token);
tokenExpiryTime = getTokenExpiryTime(token);
log.info("Expired HDFS delegation token replaced and added as credential"
+ " to current user. Token expires at {}",
getPrintableExirationTime(tokenExpiryTime));
updateRenewalTime(renewInterval);
} catch (IOException ie2) {
throw new IOException("Can't get new delegation token ", ie2);
}
}
private void updateRenewalTime(long interval) {
long delay = interval - interval/10;
renewingAction.updateInterval(delay, TimeUnit.MILLISECONDS);
log.info("Token renewal set for {} ms from now", delay);
}
private Token<?> findMatchingToken(Text service, Token<?>[] tokens) {
Token<?> token = null;
int i = 0;
while (token == null && i < tokens.length) {
if (tokens[i].getService().equals(service)) {
token = tokens[i];
}
i++;
}
return token;
}
Token<?> getToken() {
synchronized (fs) {
return token;
}
}
}
protected FileSystem getRemoteFileSystemForRenewal(Configuration config)
throws IOException {
return FileSystem.get(config);
}
}