blob: 3c4c270964635d4131cdbd38cd0d2b55905d174d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.twill.internal.yarn;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.twill.api.LocalFile;
import org.apache.twill.filesystem.FileContextLocationFactory;
import org.apache.twill.filesystem.ForwardingLocationFactory;
import org.apache.twill.filesystem.LocationFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
/**
* Collection of helper methods to simplify YARN calls.
*/
public class YarnUtils {
/**
* Defines different versions of Hadoop.
*/
public enum HadoopVersions {
HADOOP_20,
HADOOP_21,
HADOOP_22,
HADOOP_23
}
private static final Logger LOG = LoggerFactory.getLogger(YarnUtils.class);
private static final AtomicReference<HadoopVersions> HADOOP_VERSION = new AtomicReference<>();
public static YarnLocalResource createLocalResource(LocalFile localFile) {
Preconditions.checkArgument(localFile.getLastModified() >= 0, "Last modified time should be >= 0.");
Preconditions.checkArgument(localFile.getSize() >= 0, "File size should be >= 0.");
YarnLocalResource resource = createAdapter(YarnLocalResource.class);
resource.setVisibility(LocalResourceVisibility.APPLICATION);
resource.setResource(ConverterUtils.getYarnUrlFromURI(localFile.getURI()));
resource.setTimestamp(localFile.getLastModified());
resource.setSize(localFile.getSize());
return setLocalResourceType(resource, localFile);
}
public static YarnLaunchContext createLaunchContext() {
return createAdapter(YarnLaunchContext.class);
}
// temporary workaround since older versions of hadoop don't have the getVirtualCores method.
public static int getVirtualCores(Resource resource) {
try {
Method getVirtualCores = Resource.class.getMethod("getVirtualCores");
return (Integer) getVirtualCores.invoke(resource);
} catch (Exception e) {
return 0;
}
}
/**
* Temporary workaround since older versions of hadoop don't have the setCores method.
*
* @param resource
* @param cores
* @return true if virtual cores was set, false if not.
*/
public static boolean setVirtualCores(Resource resource, int cores) {
try {
Method setVirtualCores = Resource.class.getMethod("setVirtualCores", int.class);
setVirtualCores.invoke(resource, cores);
} catch (Exception e) {
// It's ok to ignore this exception, as it's using older version of API.
return false;
}
return true;
}
/**
* Creates {@link ApplicationId} from the given cluster timestamp and id.
*/
public static ApplicationId createApplicationId(long timestamp, int id) {
try {
try {
// For Hadoop-2.1
Method method = ApplicationId.class.getMethod("newInstance", long.class, int.class);
return (ApplicationId) method.invoke(null, timestamp, id);
} catch (NoSuchMethodException e) {
// Try with Hadoop-2.0 way
ApplicationId appId = Records.newRecord(ApplicationId.class);
Method setClusterTimestamp = ApplicationId.class.getMethod("setClusterTimestamp", long.class);
Method setId = ApplicationId.class.getMethod("setId", int.class);
setClusterTimestamp.invoke(appId, timestamp);
setId.invoke(appId, id);
return appId;
}
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
/**
* Helper method to get delegation tokens for the given LocationFactory.
* @param config The hadoop configuration.
* @param locationFactory The LocationFactory for generating tokens.
* @param credentials Credentials for storing tokens acquired.
* @return List of delegation Tokens acquired.
*/
public static List<Token<?>> addDelegationTokens(Configuration config,
LocationFactory locationFactory,
Credentials credentials) throws IOException {
if (!UserGroupInformation.isSecurityEnabled()) {
LOG.debug("Security is not enabled");
return ImmutableList.of();
}
FileSystem fileSystem = getFileSystem(locationFactory, config);
if (fileSystem == null) {
LOG.warn("Unexpected: LocationFactory is not backed by FileContextLocationFactory");
return ImmutableList.of();
}
String renewer = YarnUtils.getYarnTokenRenewer(config);
Token<?>[] tokens = fileSystem.addDelegationTokens(renewer, credentials);
LOG.debug("Added HDFS DelegationTokens: {}", Arrays.toString(tokens));
return tokens == null ? ImmutableList.<Token<?>>of() : ImmutableList.copyOf(tokens);
}
/**
* Clones the delegation token to individual host behind the same logical address.
*
* @param config the hadoop configuration
* @throws IOException if failed to get information for the current user.
*/
public static void cloneHaNnCredentials(Configuration config) throws IOException {
String scheme = URI.create(config.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT)).getScheme();
// Loop through all name services. Each name service could have multiple name node associated with it.
for (Map.Entry<String, Map<String, InetSocketAddress>> entry : DFSUtil.getHaNnRpcAddresses(config).entrySet()) {
String nsId = entry.getKey();
Map<String, InetSocketAddress> addressesInNN = entry.getValue();
if (!HAUtil.isHAEnabled(config, nsId) || addressesInNN == null || addressesInNN.isEmpty()) {
continue;
}
// The client may have a delegation token set for the logical
// URI of the cluster. Clone this token to apply to each of the
// underlying IPC addresses so that the IPC code can find it.
URI uri = URI.create(scheme + "://" + nsId);
LOG.info("Cloning delegation token for uri {}", uri);
HAUtil.cloneDelegationTokenForLogicalUri(UserGroupInformation.getCurrentUser(), uri, addressesInNN.values());
}
}
/**
* Encodes the given {@link Credentials} as bytes.
*/
public static ByteBuffer encodeCredentials(Credentials credentials) {
try {
DataOutputBuffer out = new DataOutputBuffer();
credentials.writeTokenStorageToStream(out);
return ByteBuffer.wrap(out.getData(), 0, out.getLength());
} catch (IOException e) {
// Shouldn't throw
LOG.error("Failed to encode Credentials.", e);
throw Throwables.propagate(e);
}
}
/**
* Decodes {@link Credentials} from the given buffer.
* If the buffer is null or empty, it returns an empty Credentials.
*/
public static Credentials decodeCredentials(ByteBuffer buffer) throws IOException {
Credentials credentials = new Credentials();
if (buffer != null && buffer.hasRemaining()) {
DataInputByteBuffer in = new DataInputByteBuffer();
in.reset(buffer);
credentials.readTokenStorageStream(in);
}
return credentials;
}
public static String getYarnTokenRenewer(Configuration config) throws IOException {
String rmHost = getRMAddress(config).getHostName();
String renewer = SecurityUtil.getServerPrincipal(config.get(YarnConfiguration.RM_PRINCIPAL), rmHost);
if (renewer == null || renewer.length() == 0) {
throw new IOException("No Kerberos principal for Yarn RM to use as renewer");
}
return renewer;
}
public static InetSocketAddress getRMAddress(Configuration config) {
return config.getSocketAddr(YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_PORT);
}
/**
* Returns {@link org.apache.twill.internal.yarn.YarnUtils.HadoopVersions} for the current build profile,
* depending on the classes in the classpath.
* @return The version of Hadoop for the current build profile.
*/
public static HadoopVersions getHadoopVersion() {
HadoopVersions hadoopVersion = HADOOP_VERSION.get();
if (hadoopVersion != null) {
return hadoopVersion;
}
try {
Class.forName("org.apache.hadoop.yarn.client.api.NMClient");
try {
Class.forName("org.apache.hadoop.yarn.client.cli.LogsCLI");
try {
Class.forName("org.apache.hadoop.yarn.conf.HAUtil");
HADOOP_VERSION.set(HadoopVersions.HADOOP_23);
} catch (ClassNotFoundException e) {
HADOOP_VERSION.set(HadoopVersions.HADOOP_22);
}
} catch (ClassNotFoundException e) {
HADOOP_VERSION.set(HadoopVersions.HADOOP_21);
}
} catch (ClassNotFoundException e) {
HADOOP_VERSION.set(HadoopVersions.HADOOP_20);
}
return HADOOP_VERSION.get();
}
/**
* Helper method to create adapter class for bridging between Hadoop 2.0 and 2.1.
*/
private static <T> T createAdapter(Class<T> clz) {
String className = clz.getPackage().getName();
if (getHadoopVersion().equals(HadoopVersions.HADOOP_20)) {
className += ".Hadoop20" + clz.getSimpleName();
} else {
className += ".Hadoop21" + clz.getSimpleName();
}
try {
//noinspection unchecked
return (T) Class.forName(className).newInstance();
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
private static YarnLocalResource setLocalResourceType(YarnLocalResource localResource, LocalFile localFile) {
if (localFile.isArchive()) {
if (localFile.getPattern() == null) {
localResource.setType(LocalResourceType.ARCHIVE);
} else {
localResource.setType(LocalResourceType.PATTERN);
localResource.setPattern(localFile.getPattern());
}
} else {
localResource.setType(LocalResourceType.FILE);
}
return localResource;
}
/**
* Gets the Hadoop {@link FileSystem} from LocationFactory.
*
* @return the Hadoop {@link FileSystem} that represents the filesystem used by the given {@link LocationFactory};
* {@code null} will be returned if unable to determine the {@link FileSystem}.
*/
@Nullable
private static FileSystem getFileSystem(LocationFactory locationFactory, Configuration config) throws IOException {
if (locationFactory instanceof ForwardingLocationFactory) {
return getFileSystem(((ForwardingLocationFactory) locationFactory).getDelegate(), config);
}
// Due to HDFS-10296, for encrypted file systems, FileContext does not acquire the KMS delegation token
// Since we know we are in Yarn, it is safe to get the FileSystem directly, bypassing LocationFactory.
if (locationFactory instanceof FileContextLocationFactory) {
return FileSystem.get(config);
}
return null;
}
private YarnUtils() {
}
}