Revert "HADOOP-18851: Performance improvement for DelegationTokenSecretManager. (#6001). Contributed by Vikas Kumar."
This reverts commit e283375cdfba409fe4ba948c0f24ed073dcbb383.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
index 8378a47..e218bea 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
@@ -120,12 +120,12 @@
/**
* Access to currentKey is protected by this object lock
*/
- private volatile DelegationKey currentKey;
+ private DelegationKey currentKey;
- private final long keyUpdateInterval;
- private final long tokenMaxLifetime;
- private final long tokenRemoverScanInterval;
- private final long tokenRenewInterval;
+ private long keyUpdateInterval;
+ private long tokenMaxLifetime;
+ private long tokenRemoverScanInterval;
+ private long tokenRenewInterval;
/**
* Whether to store a token's tracking ID in its TokenInformation.
* Can be overridden by a subclass.
@@ -491,18 +491,17 @@
}
@Override
- protected byte[] createPassword(TokenIdent identifier) {
+ protected synchronized byte[] createPassword(TokenIdent identifier) {
int sequenceNum;
long now = Time.now();
sequenceNum = incrementDelegationTokenSeqNum();
identifier.setIssueDate(now);
identifier.setMaxDate(now + tokenMaxLifetime);
- DelegationKey delegationCurrentKey = currentKey;
- identifier.setMasterKeyId(delegationCurrentKey.getKeyId());
+ identifier.setMasterKeyId(currentKey.getKeyId());
identifier.setSequenceNumber(sequenceNum);
LOG.info("Creating password for identifier: " + formatTokenId(identifier)
- + ", currentKey: " + delegationCurrentKey.getKeyId());
- byte[] password = createPassword(identifier.getBytes(), delegationCurrentKey.getKey());
+ + ", currentKey: " + currentKey.getKeyId());
+ byte[] password = createPassword(identifier.getBytes(), currentKey.getKey());
DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
try {
@@ -527,6 +526,7 @@
*/
protected DelegationTokenInformation checkToken(TokenIdent identifier)
throws InvalidToken {
+ assert Thread.holdsLock(this);
DelegationTokenInformation info = getTokenInfo(identifier);
String err;
if (info == null) {
@@ -546,7 +546,7 @@
}
@Override
- public byte[] retrievePassword(TokenIdent identifier)
+ public synchronized byte[] retrievePassword(TokenIdent identifier)
throws InvalidToken {
return checkToken(identifier).getPassword();
}
@@ -558,7 +558,7 @@
return null;
}
- public String getTokenTrackingId(TokenIdent identifier) {
+ public synchronized String getTokenTrackingId(TokenIdent identifier) {
DelegationTokenInformation info = getTokenInfo(identifier);
if (info == null) {
return null;
@@ -572,7 +572,7 @@
* @param password Password in the token.
* @throws InvalidToken InvalidToken.
*/
- public void verifyToken(TokenIdent identifier, byte[] password)
+ public synchronized void verifyToken(TokenIdent identifier, byte[] password)
throws InvalidToken {
byte[] storedPassword = retrievePassword(identifier);
if (!MessageDigest.isEqual(password, storedPassword)) {
@@ -589,7 +589,7 @@
* @throws InvalidToken if the token is invalid
* @throws AccessControlException if the user can't renew token
*/
- public long renewToken(Token<TokenIdent> token,
+ public synchronized long renewToken(Token<TokenIdent> token,
String renewer) throws InvalidToken, IOException {
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
DataInputStream in = new DataInputStream(buf);
@@ -651,7 +651,7 @@
* @throws InvalidToken for invalid token
* @throws AccessControlException if the user isn't allowed to cancel
*/
- public TokenIdent cancelToken(Token<TokenIdent> token,
+ public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
String canceller) throws IOException {
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
DataInputStream in = new DataInputStream(buf);
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
index 0642d3d..dd2ab3f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
@@ -25,7 +25,6 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
import org.apache.curator.RetryPolicy;
@@ -153,7 +152,7 @@
private final int seqNumBatchSize;
private int currentSeqNum;
private int currentMaxSeqNum;
- private final ReentrantLock currentSeqNumLock;
+
private final boolean isTokenWatcherEnabled;
public ZKDelegationTokenSecretManager(Configuration conf) {
@@ -169,8 +168,7 @@
ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT);
isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED,
ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT);
- this.currentSeqNumLock = new ReentrantLock(true);
-
+
String workPath = conf.get(ZK_DTSM_ZNODE_WORKING_PATH, ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT);
String nameSpace = workPath + "/" + ZK_DTSM_NAMESPACE;
if (CURATOR_TL.get() != null) {
@@ -506,28 +504,24 @@
// The secret manager will keep a local range of seq num which won't be
// seen by peers, so only when the range is exhausted it will ask zk for
// another range again
- try {
- this.currentSeqNumLock.lock();
- if (currentSeqNum >= currentMaxSeqNum) {
- try {
- // after a successful batch request, we can get the range starting point
- currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
- currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
- LOG.info("Fetched new range of seq num, from {} to {} ",
- currentSeqNum+1, currentMaxSeqNum);
- } catch (InterruptedException e) {
- // The ExpirationThread is just finishing.. so dont do anything..
- LOG.debug(
- "Thread interrupted while performing token counter increment", e);
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- throw new RuntimeException("Could not increment shared counter !!", e);
- }
+ if (currentSeqNum >= currentMaxSeqNum) {
+ try {
+ // after a successful batch request, we can get the range starting point
+ currentSeqNum = incrSharedCount(delTokSeqCounter, seqNumBatchSize);
+ currentMaxSeqNum = currentSeqNum + seqNumBatchSize;
+ LOG.info("Fetched new range of seq num, from {} to {} ",
+ currentSeqNum+1, currentMaxSeqNum);
+ } catch (InterruptedException e) {
+ // The ExpirationThread is just finishing.. so dont do anything..
+ LOG.debug(
+ "Thread interrupted while performing token counter increment", e);
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ throw new RuntimeException("Could not increment shared counter !!", e);
}
- return ++currentSeqNum;
- } finally {
- this.currentSeqNumLock.unlock();
}
+
+ return ++currentSeqNum;
}
@Override