| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.security.token.delegation; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.ByteArrayOutputStream; |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| |
| import javax.security.auth.login.AppConfigurationEntry; |
| |
| import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; |
| import org.apache.curator.framework.CuratorFramework; |
| import org.apache.curator.framework.CuratorFrameworkFactory; |
| import org.apache.curator.framework.CuratorFrameworkFactory.Builder; |
| import org.apache.curator.framework.api.ACLProvider; |
| import org.apache.curator.framework.imps.DefaultACLProvider; |
| import org.apache.curator.framework.recipes.cache.ChildData; |
| import org.apache.curator.framework.recipes.cache.PathChildrenCache; |
| import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; |
| import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; |
| import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; |
| import org.apache.curator.framework.recipes.shared.SharedCount; |
| import org.apache.curator.framework.recipes.shared.VersionedValue; |
| import org.apache.curator.retry.RetryNTimes; |
| import org.apache.curator.utils.EnsurePath; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceAudience.Private; |
| import org.apache.hadoop.classification.InterfaceStability.Unstable; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.KeeperException.NoNodeException; |
| import org.apache.zookeeper.ZooDefs.Perms; |
| import org.apache.zookeeper.client.ZooKeeperSaslClient; |
| import org.apache.zookeeper.data.ACL; |
| import org.apache.zookeeper.data.Id; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| |
| /** |
| * An implementation of {@link AbstractDelegationTokenSecretManager} that |
| * persists TokenIdentifiers and DelegationKeys in Zookeeper. This class can |
| * be used by HA (Highly available) services that consists of multiple nodes. |
| * This class ensures that Identifiers and Keys are replicated to all nodes of |
| * the service. |
| */ |
| @InterfaceAudience.Private |
| public abstract class ZKDelegationTokenSecretManager<TokenIdent extends AbstractDelegationTokenIdentifier> |
| extends AbstractDelegationTokenSecretManager<TokenIdent> { |
| |
| private static final String ZK_CONF_PREFIX = "zk-dt-secret-manager."; |
| public static final String ZK_DTSM_ZK_NUM_RETRIES = ZK_CONF_PREFIX |
| + "zkNumRetries"; |
| public static final String ZK_DTSM_ZK_SESSION_TIMEOUT = ZK_CONF_PREFIX |
| + "zkSessionTimeout"; |
| public static final String ZK_DTSM_ZK_CONNECTION_TIMEOUT = ZK_CONF_PREFIX |
| + "zkConnectionTimeout"; |
| public static final String ZK_DTSM_ZK_SHUTDOWN_TIMEOUT = ZK_CONF_PREFIX |
| + "zkShutdownTimeout"; |
| public static final String ZK_DTSM_ZNODE_WORKING_PATH = ZK_CONF_PREFIX |
| + "znodeWorkingPath"; |
| public static final String ZK_DTSM_ZK_AUTH_TYPE = ZK_CONF_PREFIX |
| + "zkAuthType"; |
| public static final String ZK_DTSM_ZK_CONNECTION_STRING = ZK_CONF_PREFIX |
| + "zkConnectionString"; |
| public static final String ZK_DTSM_ZK_KERBEROS_KEYTAB = ZK_CONF_PREFIX |
| + "kerberos.keytab"; |
| public static final String ZK_DTSM_ZK_KERBEROS_PRINCIPAL = ZK_CONF_PREFIX |
| + "kerberos.principal"; |
| |
| public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3; |
| public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000; |
| public static final int ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT = 10000; |
| public static final int ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT = 10000; |
| public static final String ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT = "zkdtsm"; |
| |
| private static Logger LOG = LoggerFactory |
| .getLogger(ZKDelegationTokenSecretManager.class); |
| |
| private static final String JAAS_LOGIN_ENTRY_NAME = |
| "ZKDelegationTokenSecretManagerClient"; |
| |
| private static final String ZK_DTSM_NAMESPACE = "ZKDTSMRoot"; |
| private static final String ZK_DTSM_SEQNUM_ROOT = "/ZKDTSMSeqNumRoot"; |
| private static final String ZK_DTSM_KEYID_ROOT = "/ZKDTSMKeyIdRoot"; |
| private static final String ZK_DTSM_TOKENS_ROOT = "/ZKDTSMTokensRoot"; |
| private static final String ZK_DTSM_MASTER_KEY_ROOT = "/ZKDTSMMasterKeyRoot"; |
| |
| private static final String DELEGATION_KEY_PREFIX = "DK_"; |
| private static final String DELEGATION_TOKEN_PREFIX = "DT_"; |
| |
| private static final ThreadLocal<CuratorFramework> CURATOR_TL = |
| new ThreadLocal<CuratorFramework>(); |
| |
| public static void setCurator(CuratorFramework curator) { |
| CURATOR_TL.set(curator); |
| } |
| |
| private final boolean isExternalClient; |
| private final CuratorFramework zkClient; |
| private SharedCount delTokSeqCounter; |
| private SharedCount keyIdSeqCounter; |
| private PathChildrenCache keyCache; |
| private PathChildrenCache tokenCache; |
| private ExecutorService listenerThreadPool; |
| private final long shutdownTimeout; |
| |
| public ZKDelegationTokenSecretManager(Configuration conf) { |
| super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL, |
| DelegationTokenManager.UPDATE_INTERVAL_DEFAULT) * 1000, |
| conf.getLong(DelegationTokenManager.MAX_LIFETIME, |
| DelegationTokenManager.MAX_LIFETIME_DEFAULT) * 1000, |
| conf.getLong(DelegationTokenManager.RENEW_INTERVAL, |
| DelegationTokenManager.RENEW_INTERVAL_DEFAULT) * 1000, |
| conf.getLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, |
| DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000); |
| shutdownTimeout = conf.getLong(ZK_DTSM_ZK_SHUTDOWN_TIMEOUT, |
| ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT); |
| if (CURATOR_TL.get() != null) { |
| zkClient = |
| CURATOR_TL.get().usingNamespace( |
| conf.get(ZK_DTSM_ZNODE_WORKING_PATH, |
| ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT) |
| + "/" + ZK_DTSM_NAMESPACE); |
| isExternalClient = true; |
| } else { |
| String connString = conf.get(ZK_DTSM_ZK_CONNECTION_STRING); |
| Preconditions.checkNotNull(connString, |
| "Zookeeper connection string cannot be null"); |
| String authType = conf.get(ZK_DTSM_ZK_AUTH_TYPE); |
| |
| // AuthType has to be explicitly set to 'none' or 'sasl' |
| Preconditions.checkNotNull(authType, "Zookeeper authType cannot be null !!"); |
| Preconditions.checkArgument( |
| authType.equals("sasl") || authType.equals("none"), |
| "Zookeeper authType must be one of [none, sasl]"); |
| |
| Builder builder = null; |
| try { |
| ACLProvider aclProvider = null; |
| if (authType.equals("sasl")) { |
| LOG.info("Connecting to ZooKeeper with SASL/Kerberos" |
| + "and using 'sasl' ACLs"); |
| String principal = setJaasConfiguration(conf); |
| System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY, |
| JAAS_LOGIN_ENTRY_NAME); |
| System.setProperty("zookeeper.authProvider.1", |
| "org.apache.zookeeper.server.auth.SASLAuthenticationProvider"); |
| aclProvider = new SASLOwnerACLProvider(principal); |
| } else { // "none" |
| LOG.info("Connecting to ZooKeeper without authentication"); |
| aclProvider = new DefaultACLProvider(); // open to everyone |
| } |
| int sessionT = |
| conf.getInt(ZK_DTSM_ZK_SESSION_TIMEOUT, |
| ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT); |
| int numRetries = |
| conf.getInt(ZK_DTSM_ZK_NUM_RETRIES, ZK_DTSM_ZK_NUM_RETRIES_DEFAULT); |
| builder = |
| CuratorFrameworkFactory |
| .builder() |
| .aclProvider(aclProvider) |
| .namespace( |
| conf.get(ZK_DTSM_ZNODE_WORKING_PATH, |
| ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT) |
| + "/" |
| + ZK_DTSM_NAMESPACE |
| ) |
| .sessionTimeoutMs(sessionT) |
| .connectionTimeoutMs( |
| conf.getInt(ZK_DTSM_ZK_CONNECTION_TIMEOUT, |
| ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT) |
| ) |
| .retryPolicy( |
| new RetryNTimes(numRetries, sessionT / numRetries)); |
| } catch (Exception ex) { |
| throw new RuntimeException("Could not Load ZK acls or auth"); |
| } |
| zkClient = builder.ensembleProvider(new FixedEnsembleProvider(connString)) |
| .build(); |
| isExternalClient = false; |
| } |
| } |
| |
| private String setJaasConfiguration(Configuration config) throws Exception { |
| String keytabFile = |
| config.get(ZK_DTSM_ZK_KERBEROS_KEYTAB, "").trim(); |
| if (keytabFile == null || keytabFile.length() == 0) { |
| throw new IllegalArgumentException(ZK_DTSM_ZK_KERBEROS_KEYTAB |
| + " must be specified"); |
| } |
| String principal = |
| config.get(ZK_DTSM_ZK_KERBEROS_PRINCIPAL, "").trim(); |
| if (principal == null || principal.length() == 0) { |
| throw new IllegalArgumentException(ZK_DTSM_ZK_KERBEROS_PRINCIPAL |
| + " must be specified"); |
| } |
| |
| JaasConfiguration jConf = |
| new JaasConfiguration(JAAS_LOGIN_ENTRY_NAME, principal, keytabFile); |
| javax.security.auth.login.Configuration.setConfiguration(jConf); |
| return principal.split("[/@]")[0]; |
| } |
| |
| /** |
| * Creates a programmatic version of a jaas.conf file. This can be used |
| * instead of writing a jaas.conf file and setting the system property, |
| * "java.security.auth.login.config", to point to that file. It is meant to be |
| * used for connecting to ZooKeeper. |
| */ |
| @InterfaceAudience.Private |
| public static class JaasConfiguration extends |
| javax.security.auth.login.Configuration { |
| |
| private final javax.security.auth.login.Configuration baseConfig = |
| javax.security.auth.login.Configuration.getConfiguration(); |
| private static AppConfigurationEntry[] entry; |
| private String entryName; |
| |
| /** |
| * Add an entry to the jaas configuration with the passed in name, |
| * principal, and keytab. The other necessary options will be set for you. |
| * |
| * @param entryName |
| * The name of the entry (e.g. "Client") |
| * @param principal |
| * The principal of the user |
| * @param keytab |
| * The location of the keytab |
| */ |
| public JaasConfiguration(String entryName, String principal, String keytab) { |
| this.entryName = entryName; |
| Map<String, String> options = new HashMap<String, String>(); |
| options.put("keyTab", keytab); |
| options.put("principal", principal); |
| options.put("useKeyTab", "true"); |
| options.put("storeKey", "true"); |
| options.put("useTicketCache", "false"); |
| options.put("refreshKrb5Config", "true"); |
| String jaasEnvVar = System.getenv("HADOOP_JAAS_DEBUG"); |
| if (jaasEnvVar != null && "true".equalsIgnoreCase(jaasEnvVar)) { |
| options.put("debug", "true"); |
| } |
| entry = new AppConfigurationEntry[] { |
| new AppConfigurationEntry(getKrb5LoginModuleName(), |
| AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, |
| options) }; |
| } |
| |
| @Override |
| public AppConfigurationEntry[] getAppConfigurationEntry(String name) { |
| return (entryName.equals(name)) ? entry : ((baseConfig != null) |
| ? baseConfig.getAppConfigurationEntry(name) : null); |
| } |
| |
| private String getKrb5LoginModuleName() { |
| String krb5LoginModuleName; |
| if (System.getProperty("java.vendor").contains("IBM")) { |
| krb5LoginModuleName = "com.ibm.security.auth.module.Krb5LoginModule"; |
| } else { |
| krb5LoginModuleName = "com.sun.security.auth.module.Krb5LoginModule"; |
| } |
| return krb5LoginModuleName; |
| } |
| } |
| |
| @Override |
| public void startThreads() throws IOException { |
| if (!isExternalClient) { |
| try { |
| zkClient.start(); |
| } catch (Exception e) { |
| throw new IOException("Could not start Curator Framework", e); |
| } |
| } else { |
| // If namespace parents are implicitly created, they won't have ACLs. |
| // So, let's explicitly create them. |
| CuratorFramework nullNsFw = zkClient.usingNamespace(null); |
| EnsurePath ensureNs = |
| nullNsFw.newNamespaceAwareEnsurePath("/" + zkClient.getNamespace()); |
| try { |
| ensureNs.ensure(nullNsFw.getZookeeperClient()); |
| } catch (Exception e) { |
| throw new IOException("Could not create namespace", e); |
| } |
| } |
| listenerThreadPool = Executors.newSingleThreadExecutor(); |
| try { |
| delTokSeqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0); |
| if (delTokSeqCounter != null) { |
| delTokSeqCounter.start(); |
| } |
| } catch (Exception e) { |
| throw new IOException("Could not start Sequence Counter", e); |
| } |
| try { |
| keyIdSeqCounter = new SharedCount(zkClient, ZK_DTSM_KEYID_ROOT, 0); |
| if (keyIdSeqCounter != null) { |
| keyIdSeqCounter.start(); |
| } |
| } catch (Exception e) { |
| throw new IOException("Could not start KeyId Counter", e); |
| } |
| try { |
| createPersistentNode(ZK_DTSM_MASTER_KEY_ROOT); |
| createPersistentNode(ZK_DTSM_TOKENS_ROOT); |
| } catch (Exception e) { |
| throw new RuntimeException("Could not create ZK paths"); |
| } |
| try { |
| keyCache = new PathChildrenCache(zkClient, ZK_DTSM_MASTER_KEY_ROOT, true); |
| if (keyCache != null) { |
| keyCache.start(StartMode.BUILD_INITIAL_CACHE); |
| keyCache.getListenable().addListener(new PathChildrenCacheListener() { |
| @Override |
| public void childEvent(CuratorFramework client, |
| PathChildrenCacheEvent event) |
| throws Exception { |
| switch (event.getType()) { |
| case CHILD_ADDED: |
| processKeyAddOrUpdate(event.getData().getData()); |
| break; |
| case CHILD_UPDATED: |
| processKeyAddOrUpdate(event.getData().getData()); |
| break; |
| case CHILD_REMOVED: |
| processKeyRemoved(event.getData().getPath()); |
| break; |
| default: |
| break; |
| } |
| } |
| }, listenerThreadPool); |
| loadFromZKCache(false); |
| } |
| } catch (Exception e) { |
| throw new IOException("Could not start PathChildrenCache for keys", e); |
| } |
| try { |
| tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true); |
| if (tokenCache != null) { |
| tokenCache.start(StartMode.BUILD_INITIAL_CACHE); |
| tokenCache.getListenable().addListener(new PathChildrenCacheListener() { |
| |
| @Override |
| public void childEvent(CuratorFramework client, |
| PathChildrenCacheEvent event) throws Exception { |
| switch (event.getType()) { |
| case CHILD_ADDED: |
| processTokenAddOrUpdate(event.getData()); |
| break; |
| case CHILD_UPDATED: |
| processTokenAddOrUpdate(event.getData()); |
| break; |
| case CHILD_REMOVED: |
| processTokenRemoved(event.getData()); |
| break; |
| default: |
| break; |
| } |
| } |
| }, listenerThreadPool); |
| loadFromZKCache(true); |
| } |
| } catch (Exception e) { |
| throw new IOException("Could not start PathChildrenCache for tokens", e); |
| } |
| super.startThreads(); |
| } |
| |
| /** |
| * Load the PathChildrenCache into the in-memory map. Possible caches to be |
| * loaded are keyCache and tokenCache. |
| * |
| * @param isTokenCache true if loading tokenCache, false if loading keyCache. |
| */ |
| private void loadFromZKCache(final boolean isTokenCache) { |
| final String cacheName = isTokenCache ? "token" : "key"; |
| LOG.info("Starting to load {} cache.", cacheName); |
| final List<ChildData> children; |
| if (isTokenCache) { |
| children = tokenCache.getCurrentData(); |
| } else { |
| children = keyCache.getCurrentData(); |
| } |
| |
| int count = 0; |
| for (ChildData child : children) { |
| try { |
| if (isTokenCache) { |
| processTokenAddOrUpdate(child); |
| } else { |
| processKeyAddOrUpdate(child.getData()); |
| } |
| } catch (Exception e) { |
| LOG.info("Ignoring node {} because it failed to load.", |
| child.getPath()); |
| LOG.debug("Failure exception:", e); |
| ++count; |
| } |
| } |
| if (count > 0) { |
| LOG.warn("Ignored {} nodes while loading {} cache.", count, cacheName); |
| } |
| LOG.info("Loaded {} cache.", cacheName); |
| } |
| |
| private void processKeyAddOrUpdate(byte[] data) throws IOException { |
| ByteArrayInputStream bin = new ByteArrayInputStream(data); |
| DataInputStream din = new DataInputStream(bin); |
| DelegationKey key = new DelegationKey(); |
| key.readFields(din); |
| synchronized (this) { |
| allKeys.put(key.getKeyId(), key); |
| } |
| } |
| |
| private void processKeyRemoved(String path) { |
| int i = path.lastIndexOf('/'); |
| if (i > 0) { |
| String tokSeg = path.substring(i + 1); |
| int j = tokSeg.indexOf('_'); |
| if (j > 0) { |
| int keyId = Integer.parseInt(tokSeg.substring(j + 1)); |
| synchronized (this) { |
| allKeys.remove(keyId); |
| } |
| } |
| } |
| } |
| |
| private void processTokenAddOrUpdate(ChildData data) throws IOException { |
| ByteArrayInputStream bin = new ByteArrayInputStream(data.getData()); |
| DataInputStream din = new DataInputStream(bin); |
| TokenIdent ident = createIdentifier(); |
| ident.readFields(din); |
| long renewDate = din.readLong(); |
| int pwdLen = din.readInt(); |
| byte[] password = new byte[pwdLen]; |
| int numRead = din.read(password, 0, pwdLen); |
| if (numRead > -1) { |
| DelegationTokenInformation tokenInfo = |
| new DelegationTokenInformation(renewDate, password); |
| synchronized (this) { |
| currentTokens.put(ident, tokenInfo); |
| // The cancel task might be waiting |
| notifyAll(); |
| } |
| } |
| } |
| |
| private void processTokenRemoved(ChildData data) throws IOException { |
| ByteArrayInputStream bin = new ByteArrayInputStream(data.getData()); |
| DataInputStream din = new DataInputStream(bin); |
| TokenIdent ident = createIdentifier(); |
| ident.readFields(din); |
| synchronized (this) { |
| currentTokens.remove(ident); |
| // The cancel task might be waiting |
| notifyAll(); |
| } |
| } |
| |
| @Override |
| public void stopThreads() { |
| super.stopThreads(); |
| try { |
| if (tokenCache != null) { |
| tokenCache.close(); |
| } |
| } catch (Exception e) { |
| LOG.error("Could not stop Delegation Token Cache", e); |
| } |
| try { |
| if (delTokSeqCounter != null) { |
| delTokSeqCounter.close(); |
| } |
| } catch (Exception e) { |
| LOG.error("Could not stop Delegation Token Counter", e); |
| } |
| try { |
| if (keyIdSeqCounter != null) { |
| keyIdSeqCounter.close(); |
| } |
| } catch (Exception e) { |
| LOG.error("Could not stop Key Id Counter", e); |
| } |
| try { |
| if (keyCache != null) { |
| keyCache.close(); |
| } |
| } catch (Exception e) { |
| LOG.error("Could not stop KeyCache", e); |
| } |
| try { |
| if (!isExternalClient && (zkClient != null)) { |
| zkClient.close(); |
| } |
| } catch (Exception e) { |
| LOG.error("Could not stop Curator Framework", e); |
| } |
| if (listenerThreadPool != null) { |
| listenerThreadPool.shutdown(); |
| try { |
| // wait for existing tasks to terminate |
| if (!listenerThreadPool.awaitTermination(shutdownTimeout, |
| TimeUnit.MILLISECONDS)) { |
| LOG.error("Forcing Listener threadPool to shutdown !!"); |
| listenerThreadPool.shutdownNow(); |
| } |
| } catch (InterruptedException ie) { |
| listenerThreadPool.shutdownNow(); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| private void createPersistentNode(String nodePath) throws Exception { |
| try { |
| zkClient.create().withMode(CreateMode.PERSISTENT).forPath(nodePath); |
| } catch (KeeperException.NodeExistsException ne) { |
| LOG.debug(nodePath + " znode already exists !!"); |
| } catch (Exception e) { |
| throw new IOException(nodePath + " znode could not be created !!", e); |
| } |
| } |
| |
| @Override |
| protected int getDelegationTokenSeqNum() { |
| return delTokSeqCounter.getCount(); |
| } |
| |
| private void incrSharedCount(SharedCount sharedCount) throws Exception { |
| while (true) { |
| // Loop until we successfully increment the counter |
| VersionedValue<Integer> versionedValue = sharedCount.getVersionedValue(); |
| if (sharedCount.trySetCount(versionedValue, versionedValue.getValue() + 1)) { |
| break; |
| } |
| } |
| } |
| |
| @Override |
| protected int incrementDelegationTokenSeqNum() { |
| try { |
| incrSharedCount(delTokSeqCounter); |
| } catch (InterruptedException e) { |
| // The ExpirationThread is just finishing.. so dont do anything.. |
| LOG.debug("Thread interrupted while performing token counter increment", e); |
| Thread.currentThread().interrupt(); |
| } catch (Exception e) { |
| throw new RuntimeException("Could not increment shared counter !!", e); |
| } |
| return delTokSeqCounter.getCount(); |
| } |
| |
| @Override |
| protected void setDelegationTokenSeqNum(int seqNum) { |
| try { |
| delTokSeqCounter.setCount(seqNum); |
| } catch (Exception e) { |
| throw new RuntimeException("Could not set shared counter !!", e); |
| } |
| } |
| |
| @Override |
| protected int getCurrentKeyId() { |
| return keyIdSeqCounter.getCount(); |
| } |
| |
| @Override |
| protected int incrementCurrentKeyId() { |
| try { |
| incrSharedCount(keyIdSeqCounter); |
| } catch (InterruptedException e) { |
| // The ExpirationThread is just finishing.. so dont do anything.. |
| LOG.debug("Thread interrupted while performing keyId increment", e); |
| Thread.currentThread().interrupt(); |
| } catch (Exception e) { |
| throw new RuntimeException("Could not increment shared keyId counter !!", e); |
| } |
| return keyIdSeqCounter.getCount(); |
| } |
| |
| @Override |
| protected DelegationKey getDelegationKey(int keyId) { |
| // First check if its I already have this key |
| DelegationKey key = allKeys.get(keyId); |
| // Then query ZK |
| if (key == null) { |
| try { |
| key = getKeyFromZK(keyId); |
| if (key != null) { |
| allKeys.put(keyId, key); |
| } |
| } catch (IOException e) { |
| LOG.error("Error retrieving key [" + keyId + "] from ZK", e); |
| } |
| } |
| return key; |
| } |
| |
| private DelegationKey getKeyFromZK(int keyId) throws IOException { |
| String nodePath = |
| getNodePath(ZK_DTSM_MASTER_KEY_ROOT, DELEGATION_KEY_PREFIX + keyId); |
| try { |
| byte[] data = zkClient.getData().forPath(nodePath); |
| if ((data == null) || (data.length == 0)) { |
| return null; |
| } |
| ByteArrayInputStream bin = new ByteArrayInputStream(data); |
| DataInputStream din = new DataInputStream(bin); |
| DelegationKey key = new DelegationKey(); |
| key.readFields(din); |
| return key; |
| } catch (KeeperException.NoNodeException e) { |
| LOG.error("No node in path [" + nodePath + "]"); |
| } catch (Exception ex) { |
| throw new IOException(ex); |
| } |
| return null; |
| } |
| |
| @Override |
| protected DelegationTokenInformation getTokenInfo(TokenIdent ident) { |
| // First check if I have this.. |
| DelegationTokenInformation tokenInfo = currentTokens.get(ident); |
| // Then query ZK |
| if (tokenInfo == null) { |
| try { |
| tokenInfo = getTokenInfoFromZK(ident); |
| if (tokenInfo != null) { |
| currentTokens.put(ident, tokenInfo); |
| } |
| } catch (IOException e) { |
| LOG.error("Error retrieving tokenInfo [" + ident.getSequenceNumber() |
| + "] from ZK", e); |
| } |
| } |
| return tokenInfo; |
| } |
| |
| /** |
| * This method synchronizes the state of a delegation token information in |
| * local cache with its actual value in Zookeeper. |
| * |
| * @param ident Identifier of the token |
| */ |
| private synchronized void syncLocalCacheWithZk(TokenIdent ident) { |
| try { |
| DelegationTokenInformation tokenInfo = getTokenInfoFromZK(ident); |
| if (tokenInfo != null && !currentTokens.containsKey(ident)) { |
| currentTokens.put(ident, tokenInfo); |
| } else if (tokenInfo == null && currentTokens.containsKey(ident)) { |
| currentTokens.remove(ident); |
| } |
| } catch (IOException e) { |
| LOG.error("Error retrieving tokenInfo [" + ident.getSequenceNumber() |
| + "] from ZK", e); |
| } |
| } |
| |
| private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident) |
| throws IOException { |
| return getTokenInfoFromZK(ident, false); |
| } |
| |
| private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident, |
| boolean quiet) throws IOException { |
| String nodePath = |
| getNodePath(ZK_DTSM_TOKENS_ROOT, |
| DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber()); |
| try { |
| byte[] data = zkClient.getData().forPath(nodePath); |
| if ((data == null) || (data.length == 0)) { |
| return null; |
| } |
| ByteArrayInputStream bin = new ByteArrayInputStream(data); |
| DataInputStream din = new DataInputStream(bin); |
| createIdentifier().readFields(din); |
| long renewDate = din.readLong(); |
| int pwdLen = din.readInt(); |
| byte[] password = new byte[pwdLen]; |
| int numRead = din.read(password, 0, pwdLen); |
| if (numRead > -1) { |
| DelegationTokenInformation tokenInfo = |
| new DelegationTokenInformation(renewDate, password); |
| return tokenInfo; |
| } |
| } catch (KeeperException.NoNodeException e) { |
| if (!quiet) { |
| LOG.error("No node in path [" + nodePath + "]"); |
| } |
| } catch (Exception ex) { |
| throw new IOException(ex); |
| } |
| return null; |
| } |
| |
| @Override |
| protected void storeDelegationKey(DelegationKey key) throws IOException { |
| addOrUpdateDelegationKey(key, false); |
| } |
| |
| @Override |
| protected void updateDelegationKey(DelegationKey key) throws IOException { |
| addOrUpdateDelegationKey(key, true); |
| } |
| |
| private void addOrUpdateDelegationKey(DelegationKey key, boolean isUpdate) |
| throws IOException { |
| String nodeCreatePath = |
| getNodePath(ZK_DTSM_MASTER_KEY_ROOT, |
| DELEGATION_KEY_PREFIX + key.getKeyId()); |
| ByteArrayOutputStream os = new ByteArrayOutputStream(); |
| DataOutputStream fsOut = new DataOutputStream(os); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Storing ZKDTSMDelegationKey_" + key.getKeyId()); |
| } |
| key.write(fsOut); |
| try { |
| if (zkClient.checkExists().forPath(nodeCreatePath) != null) { |
| zkClient.setData().forPath(nodeCreatePath, os.toByteArray()) |
| .setVersion(-1); |
| if (!isUpdate) { |
| LOG.debug("Key with path [" + nodeCreatePath |
| + "] already exists.. Updating !!"); |
| } |
| } else { |
| zkClient.create().withMode(CreateMode.PERSISTENT) |
| .forPath(nodeCreatePath, os.toByteArray()); |
| if (isUpdate) { |
| LOG.debug("Updating non existent Key path [" + nodeCreatePath |
| + "].. Adding new !!"); |
| } |
| } |
| } catch (KeeperException.NodeExistsException ne) { |
| LOG.debug(nodeCreatePath + " znode already exists !!"); |
| } catch (Exception ex) { |
| throw new IOException(ex); |
| } finally { |
| os.close(); |
| } |
| } |
| |
| @Override |
| protected void removeStoredMasterKey(DelegationKey key) { |
| String nodeRemovePath = |
| getNodePath(ZK_DTSM_MASTER_KEY_ROOT, |
| DELEGATION_KEY_PREFIX + key.getKeyId()); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Removing ZKDTSMDelegationKey_" + key.getKeyId()); |
| } |
| try { |
| if (zkClient.checkExists().forPath(nodeRemovePath) != null) { |
| while(zkClient.checkExists().forPath(nodeRemovePath) != null){ |
| try { |
| zkClient.delete().guaranteed().forPath(nodeRemovePath); |
| } catch (NoNodeException nne) { |
| // It is possible that the node might be deleted between the |
| // check and the actual delete.. which might lead to an |
| // exception that can bring down the daemon running this |
| // SecretManager |
| LOG.debug("Node already deleted by peer " + nodeRemovePath); |
| } |
| } |
| } else { |
| LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath); |
| } |
| } catch (Exception e) { |
| LOG.debug(nodeRemovePath + " znode could not be removed!!"); |
| } |
| } |
| |
| @Override |
| protected void storeToken(TokenIdent ident, |
| DelegationTokenInformation tokenInfo) throws IOException { |
| try { |
| addOrUpdateToken(ident, tokenInfo, false); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| protected void updateToken(TokenIdent ident, |
| DelegationTokenInformation tokenInfo) throws IOException { |
| String nodeRemovePath = |
| getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX |
| + ident.getSequenceNumber()); |
| try { |
| if (zkClient.checkExists().forPath(nodeRemovePath) != null) { |
| addOrUpdateToken(ident, tokenInfo, true); |
| } else { |
| addOrUpdateToken(ident, tokenInfo, false); |
| LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath); |
| } |
| } catch (Exception e) { |
| throw new RuntimeException("Could not update Stored Token ZKDTSMDelegationToken_" |
| + ident.getSequenceNumber(), e); |
| } |
| } |
| |
| @Override |
| protected void removeStoredToken(TokenIdent ident) |
| throws IOException { |
| String nodeRemovePath = |
| getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX |
| + ident.getSequenceNumber()); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Removing ZKDTSMDelegationToken_" |
| + ident.getSequenceNumber()); |
| } |
| try { |
| if (zkClient.checkExists().forPath(nodeRemovePath) != null) { |
| while(zkClient.checkExists().forPath(nodeRemovePath) != null){ |
| try { |
| zkClient.delete().guaranteed().forPath(nodeRemovePath); |
| } catch (NoNodeException nne) { |
| // It is possible that the node might be deleted between the |
| // check and the actual delete.. which might lead to an |
| // exception that can bring down the daemon running this |
| // SecretManager |
| LOG.debug("Node already deleted by peer " + nodeRemovePath); |
| } |
| } |
| } else { |
| LOG.debug("Attempted to remove a non-existing znode " + nodeRemovePath); |
| } |
| } catch (Exception e) { |
| throw new RuntimeException( |
| "Could not remove Stored Token ZKDTSMDelegationToken_" |
| + ident.getSequenceNumber(), e); |
| } |
| } |
| |
| @Override |
| public synchronized TokenIdent cancelToken(Token<TokenIdent> token, |
| String canceller) throws IOException { |
| ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); |
| DataInputStream in = new DataInputStream(buf); |
| TokenIdent id = createIdentifier(); |
| id.readFields(in); |
| |
| syncLocalCacheWithZk(id); |
| return super.cancelToken(token, canceller); |
| } |
| |
| private void addOrUpdateToken(TokenIdent ident, |
| DelegationTokenInformation info, boolean isUpdate) throws Exception { |
| String nodeCreatePath = |
| getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX |
| + ident.getSequenceNumber()); |
| ByteArrayOutputStream tokenOs = new ByteArrayOutputStream(); |
| DataOutputStream tokenOut = new DataOutputStream(tokenOs); |
| ByteArrayOutputStream seqOs = new ByteArrayOutputStream(); |
| |
| try { |
| ident.write(tokenOut); |
| tokenOut.writeLong(info.getRenewDate()); |
| tokenOut.writeInt(info.getPassword().length); |
| tokenOut.write(info.getPassword()); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug((isUpdate ? "Updating " : "Storing ") |
| + "ZKDTSMDelegationToken_" + |
| ident.getSequenceNumber()); |
| } |
| if (isUpdate) { |
| zkClient.setData().forPath(nodeCreatePath, tokenOs.toByteArray()) |
| .setVersion(-1); |
| } else { |
| zkClient.create().withMode(CreateMode.PERSISTENT) |
| .forPath(nodeCreatePath, tokenOs.toByteArray()); |
| } |
| } finally { |
| seqOs.close(); |
| } |
| } |
| |
| /** |
| * Simple implementation of an {@link ACLProvider} that simply returns an ACL |
| * that gives all permissions only to a single principal. |
| */ |
| private static class SASLOwnerACLProvider implements ACLProvider { |
| |
| private final List<ACL> saslACL; |
| |
| private SASLOwnerACLProvider(String principal) { |
| this.saslACL = Collections.singletonList( |
| new ACL(Perms.ALL, new Id("sasl", principal))); |
| } |
| |
| @Override |
| public List<ACL> getDefaultAcl() { |
| return saslACL; |
| } |
| |
| @Override |
| public List<ACL> getAclForPath(String path) { |
| return saslACL; |
| } |
| } |
| |
| @VisibleForTesting |
| @Private |
| @Unstable |
| static String getNodePath(String root, String nodeName) { |
| return (root + "/" + nodeName); |
| } |
| |
| @VisibleForTesting |
| public ExecutorService getListenerThreadPool() { |
| return listenerThreadPool; |
| } |
| |
| @VisibleForTesting |
| DelegationTokenInformation getTokenInfoFromMemory(TokenIdent ident) { |
| return currentTokens.get(ident); |
| } |
| } |