blob: 5357cc47e4a2f46387cddf4945463fe9408fa8a6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.core.launch;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.TimelineClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.slider.common.SliderXmlConfKeys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.text.DateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.*;
/**
* Utils to work with credentials and tokens.
*
* Designed to be movable to Hadoop core
*/
public final class CredentialUtils {
public static final String JOB_CREDENTIALS_BINARY
= SliderXmlConfKeys.MAPREDUCE_JOB_CREDENTIALS_BINARY;
private CredentialUtils() {
}
private static final Logger LOG =
LoggerFactory.getLogger(CredentialUtils.class);
/**
* Save credentials to a byte buffer. Returns null if there were no
* credentials to save
* @param credentials credential set
* @return a byte buffer of serialized tokens
* @throws IOException if the credentials could not be written to the stream
*/
public static ByteBuffer marshallCredentials(Credentials credentials) throws IOException {
ByteBuffer buffer = null;
if (!credentials.getAllTokens().isEmpty()) {
DataOutputBuffer dob = new DataOutputBuffer();
try {
credentials.writeTokenStorageToStream(dob);
} finally {
dob.close();
}
buffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
}
return buffer;
}
public static File locateEnvCredentials(Map<String, String> env,
Configuration conf,
StringBuffer sourceTextOut) throws FileNotFoundException {
String tokenFilename = env.get(HADOOP_TOKEN_FILE_LOCATION);
String source = "environment variable " + HADOOP_TOKEN_FILE_LOCATION;
if (tokenFilename == null) {
tokenFilename = conf.get(JOB_CREDENTIALS_BINARY);
source = "configuration option " + JOB_CREDENTIALS_BINARY;
}
if (tokenFilename != null) {
// use delegation tokens, i.e. from Oozie
File file = new File(tokenFilename.trim());
String details = String.format(
"Token File %s from %s",
file,
source);
if (!file.exists()) {
throw new FileNotFoundException("No " + details);
}
if (!file.isFile() && !file.canRead()) {
throw new FileNotFoundException("Cannot read " + details);
}
sourceTextOut.append(details);
return file;
} else {
return null;
}
}
/**
* Load the credentials from the environment. This looks at
* the value of {@link UserGroupInformation#HADOOP_TOKEN_FILE_LOCATION}
* and attempts to read in the value
* @param env environment to resolve the variable from
* @param conf configuration use when reading the tokens
* @return a set of credentials, or null if the environment did not
* specify any
* @throws IOException if a location for credentials was defined, but
* the credentials could not be loaded.
*/
public static Credentials loadTokensFromEnvironment(Map<String, String> env,
Configuration conf)
throws IOException {
StringBuffer origin = new StringBuffer();
File file = locateEnvCredentials(env, conf, origin);
if (file != null) {
LOG.debug("Using {}", origin);
return Credentials.readTokenStorageFile(file, conf);
} else {
return null;
}
}
/**
* Save credentials to a file
* @param file file to save to (will be overwritten)
* @param credentials credentials to write
* @throws IOException
*/
public static void saveTokens(File file,
Credentials credentials) throws IOException {
try(DataOutputStream daos = new DataOutputStream(
new FileOutputStream(file))) {
credentials.writeTokenStorageToStream(daos);
}
}
/**
* Look up and return the resource manager's principal. This method
* automatically does the <code>_HOST</code> replacement in the principal and
* correctly handles HA resource manager configurations.
*
* From: YARN-4629
* @param conf the {@link Configuration} file from which to read the
* principal
* @return the resource manager's principal string
* @throws IOException thrown if there's an error replacing the host name
*/
public static String getRMPrincipal(Configuration conf) throws IOException {
String principal = conf.get(RM_PRINCIPAL, "");
String hostname;
Preconditions.checkState(!principal.isEmpty(), "Not set: " + RM_PRINCIPAL);
if (HAUtil.isHAEnabled(conf)) {
YarnConfiguration yarnConf = new YarnConfiguration(conf);
if (yarnConf.get(RM_HA_ID) == null) {
// If RM_HA_ID is not configured, use the first of RM_HA_IDS.
// Any valid RM HA ID should work.
String[] rmIds = yarnConf.getStrings(RM_HA_IDS);
Preconditions.checkState((rmIds != null) && (rmIds.length > 0),
"Not set " + RM_HA_IDS);
yarnConf.set(RM_HA_ID, rmIds[0]);
}
hostname = yarnConf.getSocketAddr(
RM_ADDRESS,
DEFAULT_RM_ADDRESS,
DEFAULT_RM_PORT).getHostName();
} else {
hostname = conf.getSocketAddr(
RM_ADDRESS,
DEFAULT_RM_ADDRESS,
DEFAULT_RM_PORT).getHostName();
}
return SecurityUtil.getServerPrincipal(principal, hostname);
}
/**
* Create and add any filesystem delegation tokens with
* the RM(s) configured to be able to renew them. Returns null
* on an insecure cluster (i.e. harmless)
* @param conf configuration
* @param fs filesystem
* @param credentials credentials to update
* @return a list of all added tokens.
* @throws IOException
*/
public static Token<?>[] addRMRenewableFSDelegationTokens(Configuration conf,
FileSystem fs,
Credentials credentials) throws IOException {
Preconditions.checkArgument(conf != null);
Preconditions.checkArgument(credentials != null);
if (UserGroupInformation.isSecurityEnabled()) {
return fs.addDelegationTokens(CredentialUtils.getRMPrincipal(conf),
credentials);
}
return null;
}
/**
* Add an FS delegation token which can be renewed by the current user
* @param fs filesystem
* @param credentials credentials to update
* @throws IOException problems.
*/
public static void addSelfRenewableFSDelegationTokens(
FileSystem fs,
Credentials credentials) throws IOException {
Preconditions.checkArgument(fs != null);
Preconditions.checkArgument(credentials != null);
fs.addDelegationTokens(
getSelfRenewer(),
credentials);
}
public static String getSelfRenewer() throws IOException {
return UserGroupInformation.getLoginUser().getShortUserName();
}
/**
* Create and add an RM delegation token to the credentials
* @param yarnClient Yarn Client
* @param credentials to add token to
* @return the token which was added
* @throws IOException
* @throws YarnException
*/
public static Token<TokenIdentifier> addRMDelegationToken(YarnClient yarnClient,
Credentials credentials)
throws IOException, YarnException {
Configuration conf = yarnClient.getConfig();
Text rmPrincipal = new Text(CredentialUtils.getRMPrincipal(conf));
Text rmDTService = ClientRMProxy.getRMDelegationTokenService(conf);
Token<TokenIdentifier> rmDelegationToken =
ConverterUtils.convertFromYarn(
yarnClient.getRMDelegationToken(rmPrincipal),
rmDTService);
credentials.addToken(rmDelegationToken.getService(), rmDelegationToken);
return rmDelegationToken;
}
public static Token<TimelineDelegationTokenIdentifier> maybeAddTimelineToken(
Configuration conf,
Credentials credentials)
throws IOException, YarnException {
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false)) {
LOG.debug("Timeline service enabled -fetching token");
try(TimelineClient timelineClient = TimelineClient.createTimelineClient()) {
timelineClient.init(conf);
timelineClient.start();
Token<TimelineDelegationTokenIdentifier> token =
timelineClient.getDelegationToken(
CredentialUtils.getRMPrincipal(conf));
credentials.addToken(token.getService(), token);
return token;
}
} else {
LOG.debug("Timeline service is disabled");
return null;
}
}
/**
* Filter a list of tokens from a set of credentials
* @param credentials credential source (a new credential set os re
* @param filter List of tokens to strip out
* @return a new, filtered, set of credentials
*/
public static Credentials filterTokens(Credentials credentials,
List<Text> filter) {
Credentials result = new Credentials(credentials);
Iterator<Token<? extends TokenIdentifier>> iter =
result.getAllTokens().iterator();
while (iter.hasNext()) {
Token<? extends TokenIdentifier> token = iter.next();
LOG.debug("Token {}", token.getKind());
if (filter.contains(token.getKind())) {
LOG.debug("Filtering token {}", token.getKind());
iter.remove();
}
}
return result;
}
public static String dumpTokens(Credentials credentials, String separator) {
ArrayList<Token<? extends TokenIdentifier>> sorted =
new ArrayList<>(credentials.getAllTokens());
Collections.sort(sorted, new TokenComparator());
StringBuilder buffer = new StringBuilder(sorted.size()* 128);
for (Token<? extends TokenIdentifier> token : sorted) {
buffer.append(tokenToString(token)).append(separator);
}
return buffer.toString();
}
/**
* Create a string for people to look at
* @param token token to convert to a string form
* @return a printable view of the token
*/
public static String tokenToString(Token<? extends TokenIdentifier> token) {
DateFormat df = DateFormat.getDateTimeInstance(
DateFormat.SHORT, DateFormat.SHORT);
StringBuilder buffer = new StringBuilder(128);
buffer.append(token.toString());
try {
TokenIdentifier ti = token.decodeIdentifier();
buffer.append("; ").append(ti);
if (ti instanceof AbstractDelegationTokenIdentifier) {
// details in human readable form, and compensate for information HDFS DT omits
AbstractDelegationTokenIdentifier dt = (AbstractDelegationTokenIdentifier) ti;
buffer.append("; Renewer: ").append(dt.getRenewer());
buffer.append("; Issued: ")
.append(df.format(new Date(dt.getIssueDate())));
buffer.append("; Max Date: ")
.append(df.format(new Date(dt.getMaxDate())));
}
} catch (IOException e) {
//marshall problem; not ours
LOG.debug("Failed to decode {}: {}", token, e, e);
}
return buffer.toString();
}
/**
* Get the expiry time of a token.
* @param token token to examine
* @return the time in milliseconds after which the token is invalid.
* @throws IOException
*/
public static long getTokenExpiryTime(Token token) throws IOException {
TokenIdentifier identifier = token.decodeIdentifier();
Preconditions.checkState(identifier instanceof AbstractDelegationTokenIdentifier,
"Token %s of type: %s has an identifier which cannot be examined: %s",
token, token.getClass(), identifier);
AbstractDelegationTokenIdentifier id =
(AbstractDelegationTokenIdentifier) identifier;
return id.getMaxDate();
}
private static class TokenComparator
implements Comparator<Token<? extends TokenIdentifier>>, Serializable {
@Override
public int compare(Token<? extends TokenIdentifier> left,
Token<? extends TokenIdentifier> right) {
return left.getKind().toString().compareTo(right.getKind().toString());
}
}
}