blob: 066a0a5b969d9cbb1a570b95b366f993a86b38e0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.security;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.service.AbstractService;
/**
* Service to renew application delegation tokens.
*/
@Private
@Unstable
public class DelegationTokenRenewer extends AbstractService {
private static final Log LOG =
LogFactory.getLog(DelegationTokenRenewer.class);
public static final String SCHEME = "hdfs";
// global single timer (daemon)
private Timer renewalTimer;
// delegation token canceler thread
private DelegationTokenCancelThread dtCancelThread =
new DelegationTokenCancelThread();
// managing the list of tokens using Map
// appId=>List<tokens>
private Set<DelegationTokenToRenew> delegationTokens =
Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>());
private final ConcurrentMap<ApplicationId, Long> delayedRemovalMap =
new ConcurrentHashMap<ApplicationId, Long>();
private long tokenRemovalDelayMs;
private Thread delayedRemovalThread;
private boolean tokenKeepAliveEnabled;
public DelegationTokenRenewer() {
super(DelegationTokenRenewer.class.getName());
}
@Override
public synchronized void init(Configuration conf) {
super.init(conf);
this.tokenKeepAliveEnabled =
conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
this.tokenRemovalDelayMs =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
}
@Override
public synchronized void start() {
super.start();
dtCancelThread.start();
renewalTimer = new Timer(true);
if (tokenKeepAliveEnabled) {
delayedRemovalThread =
new Thread(new DelayedTokenRemovalRunnable(getConfig()),
"DelayedTokenCanceller");
delayedRemovalThread.start();
}
}
@Override
public synchronized void stop() {
if (renewalTimer != null) {
renewalTimer.cancel();
}
delegationTokens.clear();
dtCancelThread.interrupt();
try {
dtCancelThread.join(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (tokenKeepAliveEnabled && delayedRemovalThread != null) {
delayedRemovalThread.interrupt();
try {
delayedRemovalThread.join(1000);
} catch (InterruptedException e) {
LOG.info("Interrupted while joining on delayed removal thread.", e);
}
}
super.stop();
}
/**
* class that is used for keeping tracks of DT to renew
*
*/
private static class DelegationTokenToRenew {
public final Token<?> token;
public final ApplicationId applicationId;
public final Configuration conf;
public long expirationDate;
public TimerTask timerTask;
public final boolean shouldCancelAtEnd;
public DelegationTokenToRenew(
ApplicationId jId, Token<?> token,
Configuration conf, long expirationDate, boolean shouldCancelAtEnd) {
this.token = token;
this.applicationId = jId;
this.conf = conf;
this.expirationDate = expirationDate;
this.timerTask = null;
this.shouldCancelAtEnd = shouldCancelAtEnd;
if (this.token==null || this.applicationId==null || this.conf==null) {
throw new IllegalArgumentException("Invalid params to renew token" +
";token=" + this.token +
";appId=" + this.applicationId +
";conf=" + this.conf);
}
}
public void setTimerTask(TimerTask tTask) {
timerTask = tTask;
}
@Override
public String toString() {
return token + ";exp=" + expirationDate;
}
@Override
public boolean equals(Object obj) {
return obj instanceof DelegationTokenToRenew &&
token.equals(((DelegationTokenToRenew)obj).token);
}
@Override
public int hashCode() {
return token.hashCode();
}
}
private static class DelegationTokenCancelThread extends Thread {
private static class TokenWithConf {
Token<?> token;
Configuration conf;
TokenWithConf(Token<?> token, Configuration conf) {
this.token = token;
this.conf = conf;
}
}
private LinkedBlockingQueue<TokenWithConf> queue =
new LinkedBlockingQueue<TokenWithConf>();
public DelegationTokenCancelThread() {
super("Delegation Token Canceler");
setDaemon(true);
}
public void cancelToken(Token<?> token,
Configuration conf) {
TokenWithConf tokenWithConf = new TokenWithConf(token, conf);
while (!queue.offer(tokenWithConf)) {
LOG.warn("Unable to add token " + token + " for cancellation. " +
"Will retry..");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public void run() {
TokenWithConf tokenWithConf = null;
while (true) {
try {
tokenWithConf = queue.take();
final TokenWithConf current = tokenWithConf;
if (LOG.isDebugEnabled()) {
LOG.debug("Canceling token " + tokenWithConf.token.getService());
}
// need to use doAs so that http can find the kerberos tgt
UserGroupInformation.getLoginUser()
.doAs(new PrivilegedExceptionAction<Void>(){
@Override
public Void run() throws Exception {
current.token.cancel(current.conf);
return null;
}
});
} catch (IOException e) {
LOG.warn("Failed to cancel token " + tokenWithConf.token + " " +
StringUtils.stringifyException(e));
} catch (InterruptedException ie) {
return;
} catch (Throwable t) {
LOG.warn("Got exception " + StringUtils.stringifyException(t) +
". Exiting..");
System.exit(-1);
}
}
}
}
//adding token
private void addTokenToList(DelegationTokenToRenew t) {
delegationTokens.add(t);
}
/**
* Add application tokens for renewal.
* @param applicationId added application
* @param ts tokens
* @param shouldCancelAtEnd true if tokens should be canceled when the app is
* done else false.
* @throws IOException
*/
public void addApplication(
ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd)
throws IOException {
if (ts == null) {
return; //nothing to add
}
if (LOG.isDebugEnabled()) {
LOG.debug("Registering tokens for renewal for:" +
" appId = " + applicationId);
}
Collection <Token<?>> tokens = ts.getAllTokens();
long now = System.currentTimeMillis();
// find tokens for renewal, but don't add timers until we know
// all renewable tokens are valid
Set<DelegationTokenToRenew> dtrs = new HashSet<DelegationTokenToRenew>();
for(Token<?> token : tokens) {
// first renew happens immediately
if (token.isManaged()) {
DelegationTokenToRenew dtr =
new DelegationTokenToRenew(applicationId, token, getConfig(), now,
shouldCancelAtEnd);
renewToken(dtr);
dtrs.add(dtr);
}
}
for (DelegationTokenToRenew dtr : dtrs) {
addTokenToList(dtr);
setTimerForTokenRenewal(dtr);
if (LOG.isDebugEnabled()) {
LOG.debug("Registering token for renewal for:" +
" service = " + dtr.token.getService() +
" for appId = " + applicationId);
}
}
}
/**
* Task - to renew a token
*
*/
private class RenewalTimerTask extends TimerTask {
private DelegationTokenToRenew dttr;
private boolean cancelled = false;
RenewalTimerTask(DelegationTokenToRenew t) {
dttr = t;
}
@Override
public synchronized void run() {
if (cancelled) {
return;
}
Token<?> token = dttr.token;
try {
renewToken(dttr);
if (LOG.isDebugEnabled()) {
LOG.debug("Renewing delegation-token for:" + token.getService() +
"; new expiration;" + dttr.expirationDate);
}
setTimerForTokenRenewal(dttr);// set the next one
} catch (Exception e) {
LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
removeFailedDelegationToken(dttr);
}
}
@Override
public synchronized boolean cancel() {
cancelled = true;
return super.cancel();
}
}
/**
* set task to renew the token
*/
private void setTimerForTokenRenewal(DelegationTokenToRenew token)
throws IOException {
// calculate timer time
long expiresIn = token.expirationDate - System.currentTimeMillis();
long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration
// need to create new task every time
TimerTask tTask = new RenewalTimerTask(token);
token.setTimerTask(tTask); // keep reference to the timer
renewalTimer.schedule(token.timerTask, new Date(renewIn));
}
// renew a token
private void renewToken(final DelegationTokenToRenew dttr)
throws IOException {
// need to use doAs so that http can find the kerberos tgt
// NOTE: token renewers should be responsible for the correct UGI!
try {
dttr.expirationDate = UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<Long>(){
@Override
public Long run() throws Exception {
return dttr.token.renew(dttr.conf);
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
}
// cancel a token
private void cancelToken(DelegationTokenToRenew t) {
if(t.shouldCancelAtEnd) {
dtCancelThread.cancelToken(t.token, t.conf);
} else {
LOG.info("Did not cancel "+t);
}
}
/**
* removing failed DT
* @param applicationId
*/
private void removeFailedDelegationToken(DelegationTokenToRenew t) {
ApplicationId applicationId = t.applicationId;
if (LOG.isDebugEnabled())
LOG.debug("removing failed delegation token for appid=" + applicationId +
";t=" + t.token.getService());
delegationTokens.remove(t);
// cancel the timer
if(t.timerTask!=null)
t.timerTask.cancel();
}
/**
* Removing delegation token for completed applications.
* @param applicationId completed application
*/
public void applicationFinished(ApplicationId applicationId) {
if (!tokenKeepAliveEnabled) {
removeApplicationFromRenewal(applicationId);
} else {
delayedRemovalMap.put(applicationId, System.currentTimeMillis()
+ tokenRemovalDelayMs);
}
}
/**
* Add a list of applications to the keep alive list. If an appId already
* exists, update it's keep-alive time.
*
* @param appIds
* the list of applicationIds to be kept alive.
*
*/
public void updateKeepAliveApplications(List<ApplicationId> appIds) {
if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) {
for (ApplicationId appId : appIds) {
delayedRemovalMap.put(appId, System.currentTimeMillis()
+ tokenRemovalDelayMs);
}
}
}
private void removeApplicationFromRenewal(ApplicationId applicationId) {
synchronized (delegationTokens) {
Iterator<DelegationTokenToRenew> it = delegationTokens.iterator();
while(it.hasNext()) {
DelegationTokenToRenew dttr = it.next();
if (dttr.applicationId.equals(applicationId)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing delegation token for appId=" + applicationId +
"; token=" + dttr.token.getService());
}
// cancel the timer
if(dttr.timerTask!=null)
dttr.timerTask.cancel();
// cancel the token
cancelToken(dttr);
it.remove();
}
}
}
}
/**
* Takes care of cancelling app delegation tokens after the configured
* cancellation delay, taking into consideration keep-alive requests.
*
*/
private class DelayedTokenRemovalRunnable implements Runnable {
private long waitTimeMs;
DelayedTokenRemovalRunnable(Configuration conf) {
waitTimeMs =
conf.getLong(
YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS);
}
@Override
public void run() {
List<ApplicationId> toCancel = new ArrayList<ApplicationId>();
while (!Thread.currentThread().isInterrupted()) {
Iterator<Entry<ApplicationId, Long>> it =
delayedRemovalMap.entrySet().iterator();
toCancel.clear();
while (it.hasNext()) {
Entry<ApplicationId, Long> e = it.next();
if (e.getValue() < System.currentTimeMillis()) {
toCancel.add(e.getKey());
}
}
for (ApplicationId appId : toCancel) {
removeApplicationFromRenewal(appId);
delayedRemovalMap.remove(appId);
}
synchronized (this) {
try {
wait(waitTimeMs);
} catch (InterruptedException e) {
LOG.info("Delayed Deletion Thread Interrupted. Shutting it down");
return;
}
}
}
}
}
}