MAPREDUCE-2452. Makes the cancellation of delegation tokens happen in a separate thread. Contributed by Devaraj Das.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/mapreduce/trunk@1131265 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 493d130..1cf866e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -155,6 +155,9 @@
MAPREDUCE-2494. Order distributed cache deletions by LRU. (Robert Joseph
Evans via cdouglas)
+ MAPREDUCE-2452. Makes the cancellation of delegation tokens happen in a
+ separate thread. (ddas)
+
OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and
diff --git a/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java b/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
index 16a1111..9e96b55 100644
--- a/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
+++ b/src/java/org/apache/hadoop/mapreduce/security/token/DelegationTokenRenewal.java
@@ -30,6 +30,7 @@
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -49,6 +50,7 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.util.StringUtils;
@InterfaceAudience.Private
@@ -107,10 +109,87 @@
// global single timer (daemon)
private static Timer renewalTimer = new Timer(true);
+ //delegation token canceler thread
+ private static DelegationTokenCancelThread dtCancelThread =
+ new DelegationTokenCancelThread();
+ static {
+ dtCancelThread.start();
+ }
+
+
//managing the list of tokens using Map
// jobId=>List<tokens>
private static Set<DelegationTokenToRenew> delegationTokens =
Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>());
+
+ private static class DelegationTokenCancelThread extends Thread {
+ private static class TokenWithConf {
+ Token<DelegationTokenIdentifier> token;
+ Configuration conf;
+ TokenWithConf(Token<DelegationTokenIdentifier> token,
+ Configuration conf) {
+ this.token = token;
+ this.conf = conf;
+ }
+ }
+ private LinkedBlockingQueue<TokenWithConf> queue =
+ new LinkedBlockingQueue<TokenWithConf>();
+
+ public DelegationTokenCancelThread() {
+ super("Delegation Token Canceler");
+ setDaemon(true);
+ }
+ public void cancelToken(Token<DelegationTokenIdentifier> token,
+ Configuration conf) {
+ TokenWithConf tokenWithConf = new TokenWithConf(token, conf);
+ while (!queue.offer(tokenWithConf)) {
+ LOG.warn("Unable to add token " + token + " for cancellation. " +
+ "Will retry..");
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public void run() {
+ while (true) {
+ TokenWithConf tokenWithConf = null;
+ try {
+ tokenWithConf = queue.take();
+ DistributedFileSystem dfs = null;
+ try {
+ // do it over rpc. For that we need DFS object
+ dfs = getDFSForToken(tokenWithConf.token, tokenWithConf.conf);
+ } catch (Exception e) {
+ LOG.info("couldn't get DFS to cancel. Will retry over HTTPS");
+ dfs = null;
+ }
+
+ if(dfs != null) {
+ dfs.cancelDelegationToken(tokenWithConf.token);
+ } else {
+ cancelDelegationTokenOverHttps(tokenWithConf.token,
+ tokenWithConf.conf);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Canceling token " + tokenWithConf.token.getService() +
+ " for dfs=" + dfs);
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to cancel token " + tokenWithConf.token + " " +
+ StringUtils.stringifyException(e));
+ } catch (InterruptedException ie) {
+ return;
+ } catch (Throwable t) {
+ LOG.warn("Got exception " + StringUtils.stringifyException(t) +
+ ". Exiting..");
+ System.exit(-1);
+ }
+ }
+ }
+ }
//adding token
private static void addTokenToList(DelegationTokenToRenew t) {
delegationTokens.add(t);
@@ -337,24 +416,7 @@
Configuration conf = t.conf;
if(token.getKind().equals(kindHdfs)) {
- DistributedFileSystem dfs = null;
- try {
- // do it over rpc. For that we need DFS object
- dfs = getDFSForToken(token, conf);
- } catch (Exception e) {
- LOG.info("couldn't get DFS to cancel. Will retry over HTTPS");
- dfs = null;
- }
-
- try {
- if(dfs != null) {
- dfs.cancelDelegationToken(token);
- } else {
- cancelDelegationTokenOverHttps(token,conf);
- }
- } catch (Exception e) {
- LOG.warn("Failed to cancel " + token, e);
- }
+ dtCancelThread.cancelToken(token, conf);
}
}
diff --git a/src/test/findbugsExcludeFile.xml b/src/test/findbugsExcludeFile.xml
index b52d4ee..3d576f4 100644
--- a/src/test/findbugsExcludeFile.xml
+++ b/src/test/findbugsExcludeFile.xml
@@ -388,4 +388,9 @@
<Field name="started" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
+ <Match>
+ <Class name="org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal$DelegationTokenCancelThread" />
+ <Method name="run" />
+ <Bug pattern="DM_EXIT" />
+ </Match>
</FindBugsFilter>