| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.yarn.server.resourcemanager.security; |
| |
| import java.io.IOException; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.Timer; |
| import java.util.TimerTask; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.LinkedBlockingQueue; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceStability.Unstable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.security.Credentials; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.util.StringUtils; |
| import org.apache.hadoop.yarn.api.records.ApplicationId; |
| import org.apache.hadoop.yarn.conf.YarnConfiguration; |
| import org.apache.hadoop.yarn.service.AbstractService; |
| |
| /** |
| * Service to renew application delegation tokens. |
| */ |
| @Private |
| @Unstable |
| public class DelegationTokenRenewer extends AbstractService { |
| |
| private static final Log LOG = |
| LogFactory.getLog(DelegationTokenRenewer.class); |
| |
| public static final String SCHEME = "hdfs"; |
| |
| // global single timer (daemon) |
| private Timer renewalTimer; |
| |
| // delegation token canceler thread |
| private DelegationTokenCancelThread dtCancelThread = |
| new DelegationTokenCancelThread(); |
| |
| // managing the list of tokens using Map |
| // appId=>List<tokens> |
| private Set<DelegationTokenToRenew> delegationTokens = |
| Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>()); |
| |
| private final ConcurrentMap<ApplicationId, Long> delayedRemovalMap = |
| new ConcurrentHashMap<ApplicationId, Long>(); |
| |
| private long tokenRemovalDelayMs; |
| |
| private Thread delayedRemovalThread; |
| |
| private boolean tokenKeepAliveEnabled; |
| |
| public DelegationTokenRenewer() { |
| super(DelegationTokenRenewer.class.getName()); |
| } |
| |
| @Override |
| public synchronized void init(Configuration conf) { |
| super.init(conf); |
| this.tokenKeepAliveEnabled = |
| conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, |
| YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED); |
| this.tokenRemovalDelayMs = |
| conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, |
| YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); |
| } |
| |
| @Override |
| public synchronized void start() { |
| super.start(); |
| |
| dtCancelThread.start(); |
| renewalTimer = new Timer(true); |
| if (tokenKeepAliveEnabled) { |
| delayedRemovalThread = |
| new Thread(new DelayedTokenRemovalRunnable(getConfig()), |
| "DelayedTokenCanceller"); |
| delayedRemovalThread.start(); |
| } |
| } |
| |
| @Override |
| public synchronized void stop() { |
| if (renewalTimer != null) { |
| renewalTimer.cancel(); |
| } |
| delegationTokens.clear(); |
| |
| dtCancelThread.interrupt(); |
| try { |
| dtCancelThread.join(1000); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| if (tokenKeepAliveEnabled && delayedRemovalThread != null) { |
| delayedRemovalThread.interrupt(); |
| try { |
| delayedRemovalThread.join(1000); |
| } catch (InterruptedException e) { |
| LOG.info("Interrupted while joining on delayed removal thread.", e); |
| } |
| } |
| |
| super.stop(); |
| } |
| |
| /** |
| * class that is used for keeping tracks of DT to renew |
| * |
| */ |
| private static class DelegationTokenToRenew { |
| public final Token<?> token; |
| public final ApplicationId applicationId; |
| public final Configuration conf; |
| public long expirationDate; |
| public TimerTask timerTask; |
| public final boolean shouldCancelAtEnd; |
| |
| public DelegationTokenToRenew( |
| ApplicationId jId, Token<?> token, |
| Configuration conf, long expirationDate, boolean shouldCancelAtEnd) { |
| this.token = token; |
| this.applicationId = jId; |
| this.conf = conf; |
| this.expirationDate = expirationDate; |
| this.timerTask = null; |
| this.shouldCancelAtEnd = shouldCancelAtEnd; |
| if (this.token==null || this.applicationId==null || this.conf==null) { |
| throw new IllegalArgumentException("Invalid params to renew token" + |
| ";token=" + this.token + |
| ";appId=" + this.applicationId + |
| ";conf=" + this.conf); |
| } |
| } |
| |
| public void setTimerTask(TimerTask tTask) { |
| timerTask = tTask; |
| } |
| |
| @Override |
| public String toString() { |
| return token + ";exp=" + expirationDate; |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| return obj instanceof DelegationTokenToRenew && |
| token.equals(((DelegationTokenToRenew)obj).token); |
| } |
| |
| @Override |
| public int hashCode() { |
| return token.hashCode(); |
| } |
| } |
| |
| |
| private static class DelegationTokenCancelThread extends Thread { |
| private static class TokenWithConf { |
| Token<?> token; |
| Configuration conf; |
| TokenWithConf(Token<?> token, Configuration conf) { |
| this.token = token; |
| this.conf = conf; |
| } |
| } |
| private LinkedBlockingQueue<TokenWithConf> queue = |
| new LinkedBlockingQueue<TokenWithConf>(); |
| |
| public DelegationTokenCancelThread() { |
| super("Delegation Token Canceler"); |
| setDaemon(true); |
| } |
| public void cancelToken(Token<?> token, |
| Configuration conf) { |
| TokenWithConf tokenWithConf = new TokenWithConf(token, conf); |
| while (!queue.offer(tokenWithConf)) { |
| LOG.warn("Unable to add token " + token + " for cancellation. " + |
| "Will retry.."); |
| try { |
| Thread.sleep(100); |
| } catch (InterruptedException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| } |
| |
| public void run() { |
| TokenWithConf tokenWithConf = null; |
| while (true) { |
| try { |
| tokenWithConf = queue.take(); |
| final TokenWithConf current = tokenWithConf; |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Canceling token " + tokenWithConf.token.getService()); |
| } |
| // need to use doAs so that http can find the kerberos tgt |
| UserGroupInformation.getLoginUser() |
| .doAs(new PrivilegedExceptionAction<Void>(){ |
| |
| @Override |
| public Void run() throws Exception { |
| current.token.cancel(current.conf); |
| return null; |
| } |
| }); |
| } catch (IOException e) { |
| LOG.warn("Failed to cancel token " + tokenWithConf.token + " " + |
| StringUtils.stringifyException(e)); |
| } catch (InterruptedException ie) { |
| return; |
| } catch (Throwable t) { |
| LOG.warn("Got exception " + StringUtils.stringifyException(t) + |
| ". Exiting.."); |
| System.exit(-1); |
| } |
| } |
| } |
| } |
| //adding token |
| private void addTokenToList(DelegationTokenToRenew t) { |
| delegationTokens.add(t); |
| } |
| |
| /** |
| * Add application tokens for renewal. |
| * @param applicationId added application |
| * @param ts tokens |
| * @param shouldCancelAtEnd true if tokens should be canceled when the app is |
| * done else false. |
| * @throws IOException |
| */ |
| public void addApplication( |
| ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd) |
| throws IOException { |
| if (ts == null) { |
| return; //nothing to add |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Registering tokens for renewal for:" + |
| " appId = " + applicationId); |
| } |
| |
| Collection <Token<?>> tokens = ts.getAllTokens(); |
| long now = System.currentTimeMillis(); |
| |
| // find tokens for renewal, but don't add timers until we know |
| // all renewable tokens are valid |
| Set<DelegationTokenToRenew> dtrs = new HashSet<DelegationTokenToRenew>(); |
| for(Token<?> token : tokens) { |
| // first renew happens immediately |
| if (token.isManaged()) { |
| DelegationTokenToRenew dtr = |
| new DelegationTokenToRenew(applicationId, token, getConfig(), now, |
| shouldCancelAtEnd); |
| renewToken(dtr); |
| dtrs.add(dtr); |
| } |
| } |
| for (DelegationTokenToRenew dtr : dtrs) { |
| addTokenToList(dtr); |
| setTimerForTokenRenewal(dtr); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Registering token for renewal for:" + |
| " service = " + dtr.token.getService() + |
| " for appId = " + applicationId); |
| } |
| } |
| } |
| |
| /** |
| * Task - to renew a token |
| * |
| */ |
| private class RenewalTimerTask extends TimerTask { |
| private DelegationTokenToRenew dttr; |
| private boolean cancelled = false; |
| |
| RenewalTimerTask(DelegationTokenToRenew t) { |
| dttr = t; |
| } |
| |
| @Override |
| public synchronized void run() { |
| if (cancelled) { |
| return; |
| } |
| |
| Token<?> token = dttr.token; |
| try { |
| renewToken(dttr); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Renewing delegation-token for:" + token.getService() + |
| "; new expiration;" + dttr.expirationDate); |
| } |
| |
| setTimerForTokenRenewal(dttr);// set the next one |
| } catch (Exception e) { |
| LOG.error("Exception renewing token" + token + ". Not rescheduled", e); |
| removeFailedDelegationToken(dttr); |
| } |
| } |
| |
| @Override |
| public synchronized boolean cancel() { |
| cancelled = true; |
| return super.cancel(); |
| } |
| } |
| |
| /** |
| * set task to renew the token |
| */ |
| private void setTimerForTokenRenewal(DelegationTokenToRenew token) |
| throws IOException { |
| |
| // calculate timer time |
| long expiresIn = token.expirationDate - System.currentTimeMillis(); |
| long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration |
| |
| // need to create new task every time |
| TimerTask tTask = new RenewalTimerTask(token); |
| token.setTimerTask(tTask); // keep reference to the timer |
| |
| renewalTimer.schedule(token.timerTask, new Date(renewIn)); |
| } |
| |
| // renew a token |
| private void renewToken(final DelegationTokenToRenew dttr) |
| throws IOException { |
| // need to use doAs so that http can find the kerberos tgt |
| // NOTE: token renewers should be responsible for the correct UGI! |
| try { |
| dttr.expirationDate = UserGroupInformation.getLoginUser().doAs( |
| new PrivilegedExceptionAction<Long>(){ |
| @Override |
| public Long run() throws Exception { |
| return dttr.token.renew(dttr.conf); |
| } |
| }); |
| } catch (InterruptedException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| // cancel a token |
| private void cancelToken(DelegationTokenToRenew t) { |
| if(t.shouldCancelAtEnd) { |
| dtCancelThread.cancelToken(t.token, t.conf); |
| } else { |
| LOG.info("Did not cancel "+t); |
| } |
| } |
| |
| /** |
| * removing failed DT |
| * @param applicationId |
| */ |
| private void removeFailedDelegationToken(DelegationTokenToRenew t) { |
| ApplicationId applicationId = t.applicationId; |
| if (LOG.isDebugEnabled()) |
| LOG.debug("removing failed delegation token for appid=" + applicationId + |
| ";t=" + t.token.getService()); |
| delegationTokens.remove(t); |
| // cancel the timer |
| if(t.timerTask!=null) |
| t.timerTask.cancel(); |
| } |
| |
| /** |
| * Removing delegation token for completed applications. |
| * @param applicationId completed application |
| */ |
| public void applicationFinished(ApplicationId applicationId) { |
| if (!tokenKeepAliveEnabled) { |
| removeApplicationFromRenewal(applicationId); |
| } else { |
| delayedRemovalMap.put(applicationId, System.currentTimeMillis() |
| + tokenRemovalDelayMs); |
| } |
| } |
| |
| /** |
| * Add a list of applications to the keep alive list. If an appId already |
| * exists, update it's keep-alive time. |
| * |
| * @param appIds |
| * the list of applicationIds to be kept alive. |
| * |
| */ |
| public void updateKeepAliveApplications(List<ApplicationId> appIds) { |
| if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) { |
| for (ApplicationId appId : appIds) { |
| delayedRemovalMap.put(appId, System.currentTimeMillis() |
| + tokenRemovalDelayMs); |
| } |
| } |
| } |
| |
| private void removeApplicationFromRenewal(ApplicationId applicationId) { |
| synchronized (delegationTokens) { |
| Iterator<DelegationTokenToRenew> it = delegationTokens.iterator(); |
| while(it.hasNext()) { |
| DelegationTokenToRenew dttr = it.next(); |
| if (dttr.applicationId.equals(applicationId)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Removing delegation token for appId=" + applicationId + |
| "; token=" + dttr.token.getService()); |
| } |
| |
| // cancel the timer |
| if(dttr.timerTask!=null) |
| dttr.timerTask.cancel(); |
| |
| // cancel the token |
| cancelToken(dttr); |
| |
| it.remove(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Takes care of cancelling app delegation tokens after the configured |
| * cancellation delay, taking into consideration keep-alive requests. |
| * |
| */ |
| private class DelayedTokenRemovalRunnable implements Runnable { |
| |
| private long waitTimeMs; |
| |
| DelayedTokenRemovalRunnable(Configuration conf) { |
| waitTimeMs = |
| conf.getLong( |
| YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS, |
| YarnConfiguration.DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS); |
| } |
| |
| @Override |
| public void run() { |
| List<ApplicationId> toCancel = new ArrayList<ApplicationId>(); |
| while (!Thread.currentThread().isInterrupted()) { |
| Iterator<Entry<ApplicationId, Long>> it = |
| delayedRemovalMap.entrySet().iterator(); |
| toCancel.clear(); |
| while (it.hasNext()) { |
| Entry<ApplicationId, Long> e = it.next(); |
| if (e.getValue() < System.currentTimeMillis()) { |
| toCancel.add(e.getKey()); |
| } |
| } |
| for (ApplicationId appId : toCancel) { |
| removeApplicationFromRenewal(appId); |
| delayedRemovalMap.remove(appId); |
| } |
| synchronized (this) { |
| try { |
| wait(waitTimeMs); |
| } catch (InterruptedException e) { |
| LOG.info("Delayed Deletion Thread Interrupted. Shutting it down"); |
| return; |
| } |
| } |
| } |
| } |
| } |
| |
| } |