blob: 13545c9d2efbb0de4a99ff0688cd3cee2d7b2d78 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.federation.utils;
import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.registry.client.api.BindFlags;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Helper class that handles reads and writes to Yarn Registry to support UAM HA
* and second attempt.
*/
public class FederationRegistryClient {
private static final Logger LOG =
LoggerFactory.getLogger(FederationRegistryClient.class);
private RegistryOperations registry;
private UserGroupInformation user;
// AppId -> SubClusterId -> UAM token
private Map<ApplicationId, Map<String, Token<AMRMTokenIdentifier>>>
appSubClusterTokenMap;
// Structure in registry: <registryBaseDir>/<AppId>/<SubClusterId> -> UAMToken
private String registryBaseDir;
public FederationRegistryClient(Configuration conf,
RegistryOperations registry, UserGroupInformation user) {
this.registry = registry;
this.user = user;
this.appSubClusterTokenMap = new ConcurrentHashMap<>();
this.registryBaseDir =
conf.get(YarnConfiguration.FEDERATION_REGISTRY_BASE_KEY,
YarnConfiguration.DEFAULT_FEDERATION_REGISTRY_BASE_KEY);
LOG.info("Using registry {} with base directory: {}",
this.registry.getClass().getName(), this.registryBaseDir);
}
/**
* Get the list of known applications in the registry.
*
* @return the list of known applications
*/
public synchronized List<String> getAllApplications() {
// Suppress the exception here because it is valid that the entry does not
// exist
List<String> applications = null;
try {
applications = listDirRegistry(this.registry, this.user,
getRegistryKey(null, null), false);
} catch (YarnException e) {
LOG.warn("Unexpected exception from listDirRegistry", e);
}
if (applications == null) {
// It is valid for listDirRegistry to return null
return new ArrayList<>();
}
return applications;
}
/**
* For testing, delete all application records in registry.
*/
@VisibleForTesting
public synchronized void cleanAllApplications() {
try {
removeKeyRegistry(this.registry, this.user, getRegistryKey(null, null),
true, false);
} catch (YarnException e) {
LOG.warn("Unexpected exception from removeKeyRegistry", e);
}
}
/**
* Write/update the UAM token for an application and a sub-cluster.
*
* @param subClusterId sub-cluster id of the token
* @param token the UAM of the application
* @return whether the amrmToken is added or updated to a new value
*/
public synchronized boolean writeAMRMTokenForUAM(ApplicationId appId,
String subClusterId, Token<AMRMTokenIdentifier> token) {
Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap =
this.appSubClusterTokenMap.get(appId);
if (subClusterTokenMap == null) {
subClusterTokenMap = new ConcurrentHashMap<>();
this.appSubClusterTokenMap.put(appId, subClusterTokenMap);
}
boolean update = !token.equals(subClusterTokenMap.get(subClusterId));
if (!update) {
LOG.debug("Same amrmToken received from {}, skip writing registry for {}",
subClusterId, appId);
return update;
}
LOG.info("Writing/Updating amrmToken for {} to registry for {}",
subClusterId, appId);
try {
// First, write the token entry
writeRegistry(this.registry, this.user,
getRegistryKey(appId, subClusterId), token.encodeToUrlString(), true);
// Then update the subClusterTokenMap
subClusterTokenMap.put(subClusterId, token);
} catch (YarnException | IOException e) {
LOG.error(
"Failed writing AMRMToken to registry for subcluster " + subClusterId,
e);
}
return update;
}
/**
* Load the information of one application from registry.
*
* @param appId application id
* @return the sub-cluster to UAM token mapping
*/
public synchronized Map<String, Token<AMRMTokenIdentifier>>
loadStateFromRegistry(ApplicationId appId) {
Map<String, Token<AMRMTokenIdentifier>> retMap = new HashMap<>();
// Suppress the exception here because it is valid that the entry does not
// exist
List<String> subclusters = null;
try {
subclusters = listDirRegistry(this.registry, this.user,
getRegistryKey(appId, null), false);
} catch (YarnException e) {
LOG.warn("Unexpected exception from listDirRegistry", e);
}
if (subclusters == null) {
LOG.info("Application {} does not exist in registry", appId);
return retMap;
}
// Read the amrmToken for each sub-cluster with an existing UAM
for (String scId : subclusters) {
LOG.info("Reading amrmToken for subcluster {} for {}", scId, appId);
String key = getRegistryKey(appId, scId);
try {
String tokenString = readRegistry(this.registry, this.user, key, true);
if (tokenString == null) {
throw new YarnException("Null string from readRegistry key " + key);
}
Token<AMRMTokenIdentifier> amrmToken = new Token<>();
amrmToken.decodeFromUrlString(tokenString);
// Clear the service field, as if RM just issued the token
amrmToken.setService(new Text());
retMap.put(scId, amrmToken);
} catch (Exception e) {
LOG.error("Failed reading registry key " + key
+ ", skipping subcluster " + scId, e);
}
}
// Override existing map if there
this.appSubClusterTokenMap.put(appId, new ConcurrentHashMap<>(retMap));
return retMap;
}
/**
* Remove an application from registry.
*
* @param appId application id
*/
public synchronized void removeAppFromRegistry(ApplicationId appId) {
Map<String, Token<AMRMTokenIdentifier>> subClusterTokenMap =
this.appSubClusterTokenMap.get(appId);
LOG.info("Removing all registry entries for {}", appId);
if (subClusterTokenMap == null || subClusterTokenMap.size() == 0) {
return;
}
// Lastly remove the application directory
String key = getRegistryKey(appId, null);
try {
removeKeyRegistry(this.registry, this.user, key, true, true);
subClusterTokenMap.clear();
} catch (YarnException e) {
LOG.error("Failed removing registry directory key " + key, e);
}
}
private String getRegistryKey(ApplicationId appId, String fileName) {
if (appId == null) {
return this.registryBaseDir;
}
if (fileName == null) {
return this.registryBaseDir + appId.toString();
}
return this.registryBaseDir + appId.toString() + "/" + fileName;
}
private String readRegistry(final RegistryOperations registryImpl,
UserGroupInformation ugi, final String key, final boolean throwIfFails)
throws YarnException {
// Use the ugi loaded with app credentials to access registry
String result = ugi.doAs(new PrivilegedAction<String>() {
@Override
public String run() {
try {
ServiceRecord value = registryImpl.resolve(key);
if (value != null) {
return value.description;
}
} catch (Throwable e) {
if (throwIfFails) {
LOG.error("Registry resolve key " + key + " failed", e);
}
}
return null;
}
});
if (result == null && throwIfFails) {
throw new YarnException("Registry resolve key " + key + " failed");
}
return result;
}
private void removeKeyRegistry(final RegistryOperations registryImpl,
UserGroupInformation ugi, final String key, final boolean recursive,
final boolean throwIfFails) throws YarnException {
// Use the ugi loaded with app credentials to access registry
boolean success = ugi.doAs(new PrivilegedAction<Boolean>() {
@Override
public Boolean run() {
try {
registryImpl.delete(key, recursive);
return true;
} catch (Throwable e) {
if (throwIfFails) {
LOG.error("Registry remove key " + key + " failed", e);
}
}
return false;
}
});
if (!success && throwIfFails) {
throw new YarnException("Registry remove key " + key + " failed");
}
}
/**
* Write registry entry, override if exists.
*/
private void writeRegistry(final RegistryOperations registryImpl,
UserGroupInformation ugi, final String key, final String value,
final boolean throwIfFails) throws YarnException {
final ServiceRecord recordValue = new ServiceRecord();
recordValue.description = value;
// Use the ugi loaded with app credentials to access registry
boolean success = ugi.doAs(new PrivilegedAction<Boolean>() {
@Override
public Boolean run() {
try {
registryImpl.bind(key, recordValue, BindFlags.OVERWRITE);
return true;
} catch (Throwable e) {
if (throwIfFails) {
LOG.error("Registry write key " + key + " failed", e);
}
}
return false;
}
});
if (!success && throwIfFails) {
throw new YarnException("Registry write key " + key + " failed");
}
}
/**
* List the sub directories in the given directory.
*/
private List<String> listDirRegistry(final RegistryOperations registryImpl,
UserGroupInformation ugi, final String key, final boolean throwIfFails)
throws YarnException {
List<String> result = ugi.doAs(new PrivilegedAction<List<String>>() {
@Override
public List<String> run() {
try {
return registryImpl.list(key);
} catch (Throwable e) {
if (throwIfFails) {
LOG.error("Registry list key " + key + " failed", e);
}
}
return null;
}
});
if (result == null && throwIfFails) {
throw new YarnException("Registry list key " + key + " failed");
}
return result;
}
}