blob: b94e85bab45a374c8d7eb0c59c2773ce243700f9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.utils;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.Caching;
import javax.cache.configuration.CompleteConfiguration;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.expiry.CreatedExpiryPolicy;
import javax.cache.expiry.Duration;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheLoaderException;
import javax.cache.spi.CachingProvider;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyRequest;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKeyResponse;
import org.apache.hadoop.yarn.server.federation.store.records.RouterMasterKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.VisibleForTesting;
import com.zaxxer.hikari.pool.HikariPool.PoolInitializationException;
/**
*
* The FederationStateStoreFacade is an utility wrapper that provides singleton
* access to the Federation state store. It abstracts out retries and in
* addition, it also implements the caching for various objects.
*
*/
public final class FederationStateStoreFacade {
private static final Logger LOG =
LoggerFactory.getLogger(FederationStateStoreFacade.class);
private static final String GET_SUBCLUSTERS_CACHEID = "getSubClusters";
private static final String GET_POLICIES_CONFIGURATIONS_CACHEID =
"getPoliciesConfigurations";
private static final String GET_APPLICATION_HOME_SUBCLUSTER_CACHEID =
"getApplicationHomeSubCluster";
private static final FederationStateStoreFacade FACADE =
new FederationStateStoreFacade();
private FederationStateStore stateStore;
private int cacheTimeToLive;
private Configuration conf;
private Cache<Object, Object> cache;
private SubClusterResolver subclusterResolver;
private FederationStateStoreFacade() {
initializeFacadeInternal(new Configuration());
}
private void initializeFacadeInternal(Configuration config) {
this.conf = config;
try {
this.stateStore = (FederationStateStore) createRetryInstance(this.conf,
YarnConfiguration.FEDERATION_STATESTORE_CLIENT_CLASS,
YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_CLIENT_CLASS,
FederationStateStore.class, createRetryPolicy(conf));
this.stateStore.init(conf);
this.subclusterResolver = createInstance(conf,
YarnConfiguration.FEDERATION_CLUSTER_RESOLVER_CLASS,
YarnConfiguration.DEFAULT_FEDERATION_CLUSTER_RESOLVER_CLASS,
SubClusterResolver.class);
this.subclusterResolver.load();
initCache();
} catch (YarnException ex) {
LOG.error("Failed to initialize the FederationStateStoreFacade object",
ex);
throw new RuntimeException(ex);
}
}
/**
* Delete and re-initialize the cache, to force it to use the given
* configuration.
*
* @param store the {@link FederationStateStore} instance to reinitialize with
* @param config the updated configuration to reinitialize with
*/
@VisibleForTesting
public synchronized void reinitialize(FederationStateStore store,
Configuration config) {
this.conf = config;
this.stateStore = store;
clearCache();
initCache();
}
/**
* Create a RetryPolicy for {@code FederationStateStoreFacade}. In case of
* failure, it retries for:
* <ul>
* <li>{@code FederationStateStoreRetriableException}</li>
* <li>{@code CacheLoaderException}</li>
* </ul>
*
* @param conf the updated configuration
* @return the RetryPolicy for FederationStateStoreFacade
*/
public static RetryPolicy createRetryPolicy(Configuration conf) {
// Retry settings for StateStore
RetryPolicy basePolicy = RetryPolicies.exponentialBackoffRetry(
conf.getInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, Integer.SIZE),
conf.getLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS,
YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS),
TimeUnit.MILLISECONDS);
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
new HashMap<Class<? extends Exception>, RetryPolicy>();
exceptionToPolicyMap.put(FederationStateStoreRetriableException.class,
basePolicy);
exceptionToPolicyMap.put(CacheLoaderException.class, basePolicy);
exceptionToPolicyMap.put(PoolInitializationException.class, basePolicy);
RetryPolicy retryPolicy = RetryPolicies.retryByException(
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
return retryPolicy;
}
private boolean isCachingEnabled() {
return (cacheTimeToLive > 0);
}
private void initCache() {
// Picking the JCache provider from classpath, need to make sure there's
// no conflict or pick up a specific one in the future
cacheTimeToLive =
conf.getInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS,
YarnConfiguration.DEFAULT_FEDERATION_CACHE_TIME_TO_LIVE_SECS);
if (isCachingEnabled()) {
CachingProvider jcacheProvider = Caching.getCachingProvider();
CacheManager jcacheManager = jcacheProvider.getCacheManager();
this.cache = jcacheManager.getCache(this.getClass().getSimpleName());
if (this.cache == null) {
LOG.info("Creating a JCache Manager with name "
+ this.getClass().getSimpleName());
Duration cacheExpiry = new Duration(TimeUnit.SECONDS, cacheTimeToLive);
CompleteConfiguration<Object, Object> configuration =
new MutableConfiguration<Object, Object>().setStoreByValue(false)
.setReadThrough(true)
.setExpiryPolicyFactory(
new FactoryBuilder.SingletonFactory<ExpiryPolicy>(
new CreatedExpiryPolicy(cacheExpiry)))
.setCacheLoaderFactory(
new FactoryBuilder.SingletonFactory<CacheLoader<Object, Object>>(
new CacheLoaderImpl<Object, Object>()));
this.cache = jcacheManager.createCache(this.getClass().getSimpleName(),
configuration);
}
}
}
private void clearCache() {
CachingProvider jcacheProvider = Caching.getCachingProvider();
CacheManager jcacheManager = jcacheProvider.getCacheManager();
jcacheManager.destroyCache(this.getClass().getSimpleName());
this.cache = null;
}
/**
* Returns the singleton instance of the FederationStateStoreFacade object.
*
* @return the singleton {@link FederationStateStoreFacade} instance
*/
public static FederationStateStoreFacade getInstance() {
return FACADE;
}
/**
* Returns the {@link SubClusterInfo} for the specified {@link SubClusterId}.
*
* @param subClusterId the identifier of the sub-cluster
* @return the sub cluster information, or
* {@code null} if there is no mapping for the subClusterId
* @throws YarnException if the call to the state store is unsuccessful
*/
public SubClusterInfo getSubCluster(final SubClusterId subClusterId)
throws YarnException {
if (isCachingEnabled()) {
return getSubClusters(false).get(subClusterId);
} else {
GetSubClusterInfoResponse response = stateStore
.getSubCluster(GetSubClusterInfoRequest.newInstance(subClusterId));
if (response == null) {
return null;
} else {
return response.getSubClusterInfo();
}
}
}
/**
* Updates the cache with the central {@link FederationStateStore} and returns
* the {@link SubClusterInfo} for the specified {@link SubClusterId}.
*
* @param subClusterId the identifier of the sub-cluster
* @param flushCache flag to indicate if the cache should be flushed or not
* @return the sub cluster information
* @throws YarnException if the call to the state store is unsuccessful
*/
public SubClusterInfo getSubCluster(final SubClusterId subClusterId,
final boolean flushCache) throws YarnException {
if (flushCache && isCachingEnabled()) {
LOG.info("Flushing subClusters from cache and rehydrating from store,"
+ " most likely on account of RM failover.");
cache.remove(buildGetSubClustersCacheRequest(false));
}
return getSubCluster(subClusterId);
}
/**
* Returns the {@link SubClusterInfo} of all active sub cluster(s).
*
* @param filterInactiveSubClusters whether to filter out inactive
* sub-clusters
* @return the information of all active sub cluster(s)
* @throws YarnException if the call to the state store is unsuccessful
*/
@SuppressWarnings("unchecked")
public Map<SubClusterId, SubClusterInfo> getSubClusters(
final boolean filterInactiveSubClusters) throws YarnException {
try {
if (isCachingEnabled()) {
return (Map<SubClusterId, SubClusterInfo>) cache
.get(buildGetSubClustersCacheRequest(filterInactiveSubClusters));
} else {
return buildSubClusterInfoMap(stateStore.getSubClusters(
GetSubClustersInfoRequest.newInstance(filterInactiveSubClusters)));
}
} catch (Throwable ex) {
throw new YarnException(ex);
}
}
/**
* Returns the {@link SubClusterPolicyConfiguration} for the specified queue.
*
* @param queue the queue whose policy is required
* @return the corresponding configured policy, or {@code null} if there is no
* mapping for the queue
* @throws YarnException if the call to the state store is unsuccessful
*/
public SubClusterPolicyConfiguration getPolicyConfiguration(
final String queue) throws YarnException {
if (isCachingEnabled()) {
return getPoliciesConfigurations().get(queue);
} else {
GetSubClusterPolicyConfigurationResponse response =
stateStore.getPolicyConfiguration(
GetSubClusterPolicyConfigurationRequest.newInstance(queue));
if (response == null) {
return null;
} else {
return response.getPolicyConfiguration();
}
}
}
/**
* Get the policies that is represented as
* {@link SubClusterPolicyConfiguration} for all currently active queues in
* the system.
*
* @return the policies for all currently active queues in the system
* @throws YarnException if the call to the state store is unsuccessful
*/
@SuppressWarnings("unchecked")
public Map<String, SubClusterPolicyConfiguration> getPoliciesConfigurations()
throws YarnException {
try {
if (isCachingEnabled()) {
return (Map<String, SubClusterPolicyConfiguration>) cache
.get(buildGetPoliciesConfigurationsCacheRequest());
} else {
return buildPolicyConfigMap(stateStore.getPoliciesConfigurations(
GetSubClusterPoliciesConfigurationsRequest.newInstance()));
}
} catch (Throwable ex) {
throw new YarnException(ex);
}
}
/**
* Adds the home {@link SubClusterId} for the specified {@link ApplicationId}.
*
* @param appHomeSubCluster the mapping of the application to it's home
* sub-cluster
* @return the stored Subcluster from StateStore
* @throws YarnException if the call to the state store is unsuccessful
*/
public SubClusterId addApplicationHomeSubCluster(
ApplicationHomeSubCluster appHomeSubCluster) throws YarnException {
AddApplicationHomeSubClusterResponse response =
stateStore.addApplicationHomeSubCluster(
AddApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster));
return response.getHomeSubCluster();
}
/**
* Updates the home {@link SubClusterId} for the specified
* {@link ApplicationId}.
*
* @param appHomeSubCluster the mapping of the application to it's home
* sub-cluster
* @throws YarnException if the call to the state store is unsuccessful
*/
public void updateApplicationHomeSubCluster(
ApplicationHomeSubCluster appHomeSubCluster) throws YarnException {
stateStore.updateApplicationHomeSubCluster(
UpdateApplicationHomeSubClusterRequest.newInstance(appHomeSubCluster));
return;
}
/**
* Returns the home {@link SubClusterId} for the specified
* {@link ApplicationId}.
*
* @param appId the identifier of the application
* @return the home sub cluster identifier
* @throws YarnException if the call to the state store is unsuccessful
*/
public SubClusterId getApplicationHomeSubCluster(ApplicationId appId)
throws YarnException {
try {
if (isCachingEnabled()) {
SubClusterId value = SubClusterId.class.cast(
cache.get(buildGetApplicationHomeSubClusterRequest(appId)));
return value;
} else {
GetApplicationHomeSubClusterResponse response = stateStore.getApplicationHomeSubCluster(
GetApplicationHomeSubClusterRequest.newInstance(appId));
return response.getApplicationHomeSubCluster().getHomeSubCluster();
}
} catch (Throwable ex) {
throw new YarnException(ex);
}
}
/**
* Get the singleton instance of SubClusterResolver.
*
* @return SubClusterResolver instance
*/
public SubClusterResolver getSubClusterResolver() {
return this.subclusterResolver;
}
/**
* Get the configuration.
*
* @return configuration object
*/
public Configuration getConf() {
return this.conf;
}
/**
* Adds the home {@link SubClusterId} for the specified {@link ReservationId}.
*
* @param appHomeSubCluster the mapping of the reservation to it's home
* sub-cluster
* @return the stored subCluster from StateStore
* @throws YarnException if the call to the state store is unsuccessful
*/
public SubClusterId addReservationHomeSubCluster(ReservationHomeSubCluster appHomeSubCluster)
throws YarnException {
AddReservationHomeSubClusterResponse response = stateStore.addReservationHomeSubCluster(
AddReservationHomeSubClusterRequest.newInstance(appHomeSubCluster));
return response.getHomeSubCluster();
}
/**
* Returns the home {@link SubClusterId} for the specified {@link ReservationId}.
*
* @param reservationId the identifier of the reservation
* @return the home subCluster identifier
* @throws YarnException if the call to the state store is unsuccessful
*/
public SubClusterId getReservationHomeSubCluster(ReservationId reservationId)
throws YarnException {
GetReservationHomeSubClusterResponse response = stateStore.getReservationHomeSubCluster(
GetReservationHomeSubClusterRequest.newInstance(reservationId));
return response.getReservationHomeSubCluster().getHomeSubCluster();
}
/**
* Updates the home {@link SubClusterId} for the specified
* {@link ReservationId}.
*
* @param appHomeSubCluster the mapping of the reservation to it's home
* sub-cluster
* @throws YarnException if the call to the state store is unsuccessful
*/
public void updateReservationHomeSubCluster(ReservationHomeSubCluster appHomeSubCluster)
throws YarnException {
UpdateReservationHomeSubClusterRequest request =
UpdateReservationHomeSubClusterRequest.newInstance(appHomeSubCluster);
stateStore.updateReservationHomeSubCluster(request);
}
/**
* Delete the home {@link SubClusterId} for the specified
* {@link ReservationId}.
*
* @param reservationId the identifier of the reservation
* @throws YarnException if the call to the state store is unsuccessful
*/
public void deleteReservationHomeSubCluster(ReservationId reservationId) throws YarnException {
DeleteReservationHomeSubClusterRequest request =
DeleteReservationHomeSubClusterRequest.newInstance(reservationId);
stateStore.deleteReservationHomeSubCluster(request);
}
/**
* Helper method to create instances of Object using the class name defined in
* the configuration object. The instances creates {@link RetryProxy} using
* the specific {@link RetryPolicy}.
*
* @param conf the yarn configuration
* @param configuredClassName the configuration provider key
* @param defaultValue the default implementation for fallback
* @param type the class for which a retry proxy is required
* @param retryPolicy the policy for retrying method call failures
* @return a retry proxy for the specified interface
*/
public static <T> Object createRetryInstance(Configuration conf,
String configuredClassName, String defaultValue, Class<T> type,
RetryPolicy retryPolicy) {
return RetryProxy.create(type,
createInstance(conf, configuredClassName, defaultValue, type),
retryPolicy);
}
/**
* Helper method to create instances of Object using the class name specified
* in the configuration object.
*
* @param conf the yarn configuration
* @param configuredClassName the configuration provider key
* @param defaultValue the default implementation class
* @param type the required interface/base class
* @param <T> The type of the instance to create
* @return the instances created
*/
@SuppressWarnings("unchecked")
public static <T> T createInstance(Configuration conf,
String configuredClassName, String defaultValue, Class<T> type) {
String className = conf.get(configuredClassName, defaultValue);
try {
Class<?> clusterResolverClass = conf.getClassByName(className);
if (type.isAssignableFrom(clusterResolverClass)) {
return (T) ReflectionUtils.newInstance(clusterResolverClass, conf);
} else {
throw new YarnRuntimeException("Class: " + className
+ " not instance of " + type.getCanonicalName());
}
} catch (ClassNotFoundException e) {
throw new YarnRuntimeException("Could not instantiate : " + className, e);
}
}
private Map<SubClusterId, SubClusterInfo> buildSubClusterInfoMap(
final GetSubClustersInfoResponse response) {
List<SubClusterInfo> subClusters = response.getSubClusters();
Map<SubClusterId, SubClusterInfo> subClustersMap =
new HashMap<>(subClusters.size());
for (SubClusterInfo subCluster : subClusters) {
subClustersMap.put(subCluster.getSubClusterId(), subCluster);
}
return subClustersMap;
}
private Object buildGetSubClustersCacheRequest(
final boolean filterInactiveSubClusters) {
final String cacheKey =
buildCacheKey(getClass().getSimpleName(), GET_SUBCLUSTERS_CACHEID,
Boolean.toString(filterInactiveSubClusters));
CacheRequest<String, Map<SubClusterId, SubClusterInfo>> cacheRequest =
new CacheRequest<String, Map<SubClusterId, SubClusterInfo>>(cacheKey,
new Func<String, Map<SubClusterId, SubClusterInfo>>() {
@Override
public Map<SubClusterId, SubClusterInfo> invoke(String key)
throws Exception {
GetSubClustersInfoResponse subClusters =
stateStore.getSubClusters(GetSubClustersInfoRequest
.newInstance(filterInactiveSubClusters));
return buildSubClusterInfoMap(subClusters);
}
});
return cacheRequest;
}
private Map<String, SubClusterPolicyConfiguration> buildPolicyConfigMap(
GetSubClusterPoliciesConfigurationsResponse response) {
List<SubClusterPolicyConfiguration> policyConfigs =
response.getPoliciesConfigs();
Map<String, SubClusterPolicyConfiguration> queuePolicyConfigs =
new HashMap<>();
for (SubClusterPolicyConfiguration policyConfig : policyConfigs) {
queuePolicyConfigs.put(policyConfig.getQueue(), policyConfig);
}
return queuePolicyConfigs;
}
private Object buildGetPoliciesConfigurationsCacheRequest() {
final String cacheKey = buildCacheKey(getClass().getSimpleName(),
GET_POLICIES_CONFIGURATIONS_CACHEID, null);
CacheRequest<String, Map<String, SubClusterPolicyConfiguration>> cacheRequest =
new CacheRequest<String, Map<String, SubClusterPolicyConfiguration>>(
cacheKey,
new Func<String, Map<String, SubClusterPolicyConfiguration>>() {
@Override
public Map<String, SubClusterPolicyConfiguration> invoke(
String key) throws Exception {
GetSubClusterPoliciesConfigurationsResponse policyConfigs =
stateStore.getPoliciesConfigurations(
GetSubClusterPoliciesConfigurationsRequest
.newInstance());
return buildPolicyConfigMap(policyConfigs);
}
});
return cacheRequest;
}
private Object buildGetApplicationHomeSubClusterRequest(ApplicationId applicationId) {
final String cacheKey = buildCacheKey(getClass().getSimpleName(),
GET_APPLICATION_HOME_SUBCLUSTER_CACHEID, applicationId.toString());
CacheRequest<String, SubClusterId> cacheRequest = new CacheRequest<>(
cacheKey,
input -> {
GetApplicationHomeSubClusterRequest request =
GetApplicationHomeSubClusterRequest.newInstance(applicationId);
GetApplicationHomeSubClusterResponse response =
stateStore.getApplicationHomeSubCluster(request);
ApplicationHomeSubCluster appHomeSubCluster = response.getApplicationHomeSubCluster();
SubClusterId subClusterId = appHomeSubCluster.getHomeSubCluster();
return subClusterId;
});
return cacheRequest;
}
protected String buildCacheKey(String typeName, String methodName,
String argName) {
StringBuilder buffer = new StringBuilder();
buffer.append(typeName).append(".")
.append(methodName);
if (argName != null) {
buffer.append("::");
buffer.append(argName);
}
return buffer.toString();
}
/**
* Internal class that implements the CacheLoader interface that can be
* plugged into the CacheManager to load objects into the cache for specified
* keys.
*/
private static class CacheLoaderImpl<K, V> implements CacheLoader<K, V> {
@SuppressWarnings("unchecked")
@Override
public V load(K key) throws CacheLoaderException {
try {
CacheRequest<K, V> query = (CacheRequest<K, V>) key;
assert query != null;
return query.getValue();
} catch (Throwable ex) {
throw new CacheLoaderException(ex);
}
}
@Override
public Map<K, V> loadAll(Iterable<? extends K> keys)
throws CacheLoaderException {
// The FACADE does not use the Cache's getAll API. Hence this is not
// required to be implemented
throw new NotImplementedException("Code is not implemented");
}
}
/**
* Internal class that encapsulates the cache key and a function that returns
* the value for the specified key.
*/
private static class CacheRequest<K, V> {
private K key;
private Func<K, V> func;
CacheRequest(K key, Func<K, V> func) {
this.key = key;
this.func = func;
}
public V getValue() throws Exception {
return func.invoke(key);
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((key == null) ? 0 : key.hashCode());
return result;
}
@SuppressWarnings("unchecked")
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
CacheRequest<K, V> other = (CacheRequest<K, V>) obj;
if (key == null) {
if (other.key != null) {
return false;
}
} else if (!key.equals(other.key)) {
return false;
}
return true;
}
}
/**
* Encapsulates a method that has one parameter and returns a value of the
* type specified by the TResult parameter.
*/
protected interface Func<T, TResult> {
TResult invoke(T input) throws Exception;
}
@VisibleForTesting
public Cache<Object, Object> getCache() {
return cache;
}
@VisibleForTesting
protected Object getAppHomeSubClusterCacheRequest(ApplicationId applicationId) {
return buildGetApplicationHomeSubClusterRequest(applicationId);
}
@VisibleForTesting
public FederationStateStore getStateStore() {
return stateStore;
}
/**
* The Router Supports Store NewMasterKey (RouterMasterKey{@link RouterMasterKey}).
*
* @param newKey Key used for generating and verifying delegation tokens
* @throws YarnException if the call to the state store is unsuccessful
* @throws IOException An IO Error occurred
* @return RouterMasterKeyResponse
*/
public RouterMasterKeyResponse storeNewMasterKey(DelegationKey newKey)
throws YarnException, IOException {
LOG.info("Storing master key with keyID {}.", newKey.getKeyId());
ByteBuffer keyBytes = ByteBuffer.wrap(newKey.getEncodedKey());
RouterMasterKey masterKey = RouterMasterKey.newInstance(newKey.getKeyId(),
keyBytes, newKey.getExpiryDate());
RouterMasterKeyRequest keyRequest = RouterMasterKeyRequest.newInstance(masterKey);
return stateStore.storeNewMasterKey(keyRequest);
}
/**
* The Router Supports Remove MasterKey (RouterMasterKey{@link RouterMasterKey}).
*
* @param newKey Key used for generating and verifying delegation tokens
* @throws YarnException if the call to the state store is unsuccessful
* @throws IOException An IO Error occurred
*/
public void removeStoredMasterKey(DelegationKey newKey) throws YarnException, IOException {
LOG.info("Removing master key with keyID {}.", newKey.getKeyId());
ByteBuffer keyBytes = ByteBuffer.wrap(newKey.getEncodedKey());
RouterMasterKey masterKey = RouterMasterKey.newInstance(newKey.getKeyId(),
keyBytes, newKey.getExpiryDate());
RouterMasterKeyRequest keyRequest = RouterMasterKeyRequest.newInstance(masterKey);
stateStore.removeStoredMasterKey(keyRequest);
}
/**
* The Router Supports GetMasterKeyByDelegationKey.
*
* @param newKey Key used for generating and verifying delegation tokens
* @throws YarnException if the call to the state store is unsuccessful
* @throws IOException An IO Error occurred
* @return RouterMasterKeyResponse
*/
public RouterMasterKeyResponse getMasterKeyByDelegationKey(DelegationKey newKey)
throws YarnException, IOException {
LOG.info("Storing master key with keyID {}.", newKey.getKeyId());
ByteBuffer keyBytes = ByteBuffer.wrap(newKey.getEncodedKey());
RouterMasterKey masterKey = RouterMasterKey.newInstance(newKey.getKeyId(),
keyBytes, newKey.getExpiryDate());
RouterMasterKeyRequest keyRequest = RouterMasterKeyRequest.newInstance(masterKey);
return stateStore.getMasterKeyByDelegationKey(keyRequest);
}
}