blob: 6fbbf930ef9991a64a029738c811340ae4cdc374 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.twill.yarn;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.io.ByteStreams;
import com.google.common.io.OutputSupplier;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import joptsimple.OptionSpec;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.twill.api.ClassAcceptor;
import org.apache.twill.api.Configs;
import org.apache.twill.api.EventHandlerSpecification;
import org.apache.twill.api.LocalFile;
import org.apache.twill.api.RunId;
import org.apache.twill.api.RuntimeSpecification;
import org.apache.twill.api.SecureStore;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillPreparer;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.api.logging.LogHandler;
import org.apache.twill.filesystem.Location;
import org.apache.twill.internal.ApplicationBundler;
import org.apache.twill.internal.Arguments;
import org.apache.twill.internal.Constants;
import org.apache.twill.internal.DefaultLocalFile;
import org.apache.twill.internal.DefaultRuntimeSpecification;
import org.apache.twill.internal.DefaultTwillSpecification;
import org.apache.twill.internal.EnvKeys;
import org.apache.twill.internal.JvmOptions;
import org.apache.twill.internal.LogOnlyEventHandler;
import org.apache.twill.internal.ProcessController;
import org.apache.twill.internal.ProcessLauncher;
import org.apache.twill.internal.TwillRuntimeSpecification;
import org.apache.twill.internal.appmaster.ApplicationMasterInfo;
import org.apache.twill.internal.appmaster.ApplicationMasterMain;
import org.apache.twill.internal.container.TwillContainerMain;
import org.apache.twill.internal.io.LocationCache;
import org.apache.twill.internal.json.ArgumentsCodec;
import org.apache.twill.internal.json.JvmOptionsCodec;
import org.apache.twill.internal.json.LocalFileCodec;
import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter;
import org.apache.twill.internal.utils.Dependencies;
import org.apache.twill.internal.utils.Paths;
import org.apache.twill.internal.utils.Resources;
import org.apache.twill.internal.yarn.YarnAppClient;
import org.apache.twill.internal.yarn.YarnApplicationReport;
import org.apache.twill.internal.yarn.YarnUtils;
import org.apache.twill.launcher.FindFreePort;
import org.apache.twill.launcher.TwillLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
/**
* Implementation for {@link TwillPreparer} to prepare and launch distributed application on Hadoop YARN.
*/
final class YarnTwillPreparer implements TwillPreparer {
private static final Logger LOG = LoggerFactory.getLogger(YarnTwillPreparer.class);
private static final Function<Class<?>, String> CLASS_TO_NAME = new Function<Class<?>, String>() {
@Override
public String apply(Class<?> cls) {
return cls.getName();
}
};
private final YarnConfiguration yarnConfig;
private final TwillSpecification twillSpec;
private final YarnAppClient yarnAppClient;
private final String zkConnectString;
private final Location appLocation;
private final YarnTwillControllerFactory controllerFactory;
private final RunId runId;
private final List<LogHandler> logHandlers = Lists.newArrayList();
private final List<String> arguments = Lists.newArrayList();
private final Set<Class<?>> dependencies = Sets.newIdentityHashSet();
private final List<URI> resources = Lists.newArrayList();
private final List<String> classPaths = Lists.newArrayList();
private final ListMultimap<String, String> runnableArgs = ArrayListMultimap.create();
private final Map<String, Map<String, String>> environments = Maps.newHashMap();
private final List<String> applicationClassPaths = Lists.newArrayList();
private final Credentials credentials;
private final int reservedMemory;
private final double minHeapRatio;
private final File localStagingDir;
private final Map<String, Map<String, String>> logLevels = Maps.newHashMap();
private final LocationCache locationCache;
private final Set<URL> twillClassPaths;
private String schedulerQueue;
private String extraOptions;
private JvmOptions.DebugOptions debugOptions = JvmOptions.DebugOptions.NO_DEBUG;
private ClassAcceptor classAcceptor;
private final Map<String, Integer> maxRetries = Maps.newHashMap();
YarnTwillPreparer(YarnConfiguration yarnConfig, TwillSpecification twillSpec, RunId runId,
YarnAppClient yarnAppClient, String zkConnectString, Location appLocation, Set<URL> twillClassPaths,
String extraOptions, LocationCache locationCache, YarnTwillControllerFactory controllerFactory) {
this.yarnConfig = yarnConfig;
this.twillSpec = twillSpec;
this.runId = runId;
this.yarnAppClient = yarnAppClient;
this.zkConnectString = zkConnectString;
this.appLocation = appLocation;
this.controllerFactory = controllerFactory;
this.credentials = createCredentials();
this.reservedMemory = yarnConfig.getInt(Configs.Keys.JAVA_RESERVED_MEMORY_MB,
Configs.Defaults.JAVA_RESERVED_MEMORY_MB);
// doing this way to support hadoop-2.0 profile
String minHeapRatioStr = yarnConfig.get(Configs.Keys.HEAP_RESERVED_MIN_RATIO);
this.minHeapRatio = (minHeapRatioStr == null) ?
Configs.Defaults.HEAP_RESERVED_MIN_RATIO : Double.parseDouble(minHeapRatioStr);
this.localStagingDir = new File(yarnConfig.get(Configs.Keys.LOCAL_STAGING_DIRECTORY,
Configs.Defaults.LOCAL_STAGING_DIRECTORY));
this.extraOptions = extraOptions;
this.classAcceptor = new ClassAcceptor();
this.locationCache = locationCache;
this.twillClassPaths = twillClassPaths;
// By default, the root logger is having INFO log level
setLogLevel(LogEntry.Level.INFO);
}
private void confirmRunnableName(String runnableName) {
Preconditions.checkNotNull(runnableName);
Preconditions.checkArgument(twillSpec.getRunnables().containsKey(runnableName),
"Runnable %s is not defined in the application.", runnableName);
}
@Override
public TwillPreparer addLogHandler(LogHandler handler) {
logHandlers.add(handler);
return this;
}
@Override
public TwillPreparer setUser(String user) {
return this;
}
@Override
public TwillPreparer setSchedulerQueue(String name) {
this.schedulerQueue = name;
return this;
}
@Override
public TwillPreparer setJVMOptions(String options) {
this.extraOptions = options;
return this;
}
@Override
public TwillPreparer addJVMOptions(String options) {
this.extraOptions = extraOptions == null ? options : extraOptions + " " + options;
return this;
}
@Override
public TwillPreparer enableDebugging(String... runnables) {
return enableDebugging(false, runnables);
}
@Override
public TwillPreparer enableDebugging(boolean doSuspend, String... runnables) {
this.debugOptions = new JvmOptions.DebugOptions(true, doSuspend, ImmutableSet.copyOf(runnables));
return this;
}
@Override
public TwillPreparer withApplicationArguments(String... args) {
return withApplicationArguments(ImmutableList.copyOf(args));
}
@Override
public TwillPreparer withApplicationArguments(Iterable<String> args) {
Iterables.addAll(arguments, args);
return this;
}
@Override
public TwillPreparer withArguments(String runnableName, String... args) {
return withArguments(runnableName, ImmutableList.copyOf(args));
}
@Override
public TwillPreparer withArguments(String runnableName, Iterable<String> args) {
confirmRunnableName(runnableName);
runnableArgs.putAll(runnableName, args);
return this;
}
@Override
public TwillPreparer withDependencies(Class<?>... classes) {
return withDependencies(ImmutableList.copyOf(classes));
}
@Override
public TwillPreparer withDependencies(Iterable<Class<?>> classes) {
Iterables.addAll(dependencies, classes);
return this;
}
@Override
public TwillPreparer withResources(URI... resources) {
return withResources(ImmutableList.copyOf(resources));
}
@Override
public TwillPreparer withResources(Iterable<URI> resources) {
Iterables.addAll(this.resources, resources);
return this;
}
@Override
public TwillPreparer withClassPaths(String... classPaths) {
return withClassPaths(ImmutableList.copyOf(classPaths));
}
@Override
public TwillPreparer withClassPaths(Iterable<String> classPaths) {
Iterables.addAll(this.classPaths, classPaths);
return this;
}
@Override
public TwillPreparer withEnv(Map<String, String> env) {
// Add the given environments to all runnables
for (String runnableName : twillSpec.getRunnables().keySet()) {
setEnv(runnableName, env, false);
}
return this;
}
@Override
public TwillPreparer withEnv(String runnableName, Map<String, String> env) {
confirmRunnableName(runnableName);
setEnv(runnableName, env, true);
return this;
}
@Override
public TwillPreparer withApplicationClassPaths(String... classPaths) {
return withApplicationClassPaths(ImmutableList.copyOf(classPaths));
}
@Override
public TwillPreparer withApplicationClassPaths(Iterable<String> classPaths) {
Iterables.addAll(this.applicationClassPaths, classPaths);
return this;
}
@Override
public TwillPreparer withBundlerClassAcceptor(ClassAcceptor classAcceptor) {
this.classAcceptor = classAcceptor;
return this;
}
@Override
public TwillPreparer withMaxRetries(String runnableName, int maxRetries) {
confirmRunnableName(runnableName);
this.maxRetries.put(runnableName, maxRetries);
return this;
}
@Override
public TwillPreparer addSecureStore(SecureStore secureStore) {
Object store = secureStore.getStore();
Preconditions.checkArgument(store instanceof Credentials, "Only Hadoop Credentials is supported.");
this.credentials.mergeAll((Credentials) store);
return this;
}
@Override
public TwillPreparer setLogLevel(LogEntry.Level logLevel) {
return setLogLevels(ImmutableMap.of(Logger.ROOT_LOGGER_NAME, logLevel));
}
@Override
public TwillPreparer setLogLevels(Map<String, LogEntry.Level> logLevels) {
Preconditions.checkNotNull(logLevels);
for (String runnableName : twillSpec.getRunnables().keySet()) {
saveLogLevels(runnableName, logLevels);
}
return this;
}
@Override
public TwillPreparer setLogLevels(String runnableName, Map<String, LogEntry.Level> runnableLogLevels) {
confirmRunnableName(runnableName);
Preconditions.checkNotNull(runnableLogLevels);
Preconditions.checkArgument(!(logLevels.containsKey(Logger.ROOT_LOGGER_NAME)
&& logLevels.get(Logger.ROOT_LOGGER_NAME) == null));
saveLogLevels(runnableName, runnableLogLevels);
return this;
}
@Override
public TwillController start() {
return start(Constants.APPLICATION_MAX_START_SECONDS, TimeUnit.SECONDS);
}
@Override
public TwillController start(long timeout, TimeUnit timeoutUnit) {
try {
final ProcessLauncher<ApplicationMasterInfo> launcher = yarnAppClient.createLauncher(twillSpec, schedulerQueue);
final ApplicationMasterInfo appMasterInfo = launcher.getContainerInfo();
Callable<ProcessController<YarnApplicationReport>> submitTask =
new Callable<ProcessController<YarnApplicationReport>>() {
@Override
public ProcessController<YarnApplicationReport> call() throws Exception {
// Local files needed by AM
Map<String, LocalFile> localFiles = Maps.newHashMap();
createLauncherJar(localFiles);
createTwillJar(createBundler(classAcceptor), localFiles);
createApplicationJar(createApplicationJarBundler(classAcceptor), localFiles);
createResourcesJar(createBundler(classAcceptor), localFiles);
Path runtimeConfigDir = Files.createTempDirectory(localStagingDir.toPath(),
Constants.Files.RUNTIME_CONFIG_JAR);
try {
saveSpecification(twillSpec, runtimeConfigDir.resolve(Constants.Files.TWILL_SPEC));
saveLogback(runtimeConfigDir.resolve(Constants.Files.LOGBACK_TEMPLATE));
saveClassPaths(runtimeConfigDir);
saveJvmOptions(runtimeConfigDir.resolve(Constants.Files.JVM_OPTIONS));
saveArguments(new Arguments(arguments, runnableArgs),
runtimeConfigDir.resolve(Constants.Files.ARGUMENTS));
saveEnvironments(runtimeConfigDir.resolve(Constants.Files.ENVIRONMENTS));
createRuntimeConfigJar(runtimeConfigDir, localFiles);
} finally {
Paths.deleteRecursively(runtimeConfigDir);
}
createLocalizeFilesJson(localFiles);
LOG.debug("Submit AM container spec: {}", appMasterInfo);
// java -Djava.io.tmpdir=tmp -cp launcher.jar:$HADOOP_CONF_DIR -XmxMemory
// org.apache.twill.internal.TwillLauncher
// appMaster.jar
// org.apache.twill.internal.appmaster.ApplicationMasterMain
// false
int memory = Resources.computeMaxHeapSize(appMasterInfo.getMemoryMB(),
Constants.APP_MASTER_RESERVED_MEMORY_MB,
Constants.HEAP_MIN_RATIO);
return launcher.prepareLaunch(ImmutableMap.<String, String>of(), localFiles.values(), credentials)
.addCommand(
"$JAVA_HOME/bin/java",
"-Djava.io.tmpdir=tmp",
"-Dyarn.appId=$" + EnvKeys.YARN_APP_ID_STR,
"-Dtwill.app=$" + Constants.TWILL_APP_NAME,
"-cp", Constants.Files.LAUNCHER_JAR + ":$HADOOP_CONF_DIR",
"-Xmx" + memory + "m",
extraOptions == null ? "" : extraOptions,
TwillLauncher.class.getName(),
ApplicationMasterMain.class.getName(),
Boolean.FALSE.toString())
.launch();
}
};
YarnTwillController controller = controllerFactory.create(runId, logHandlers, submitTask, timeout, timeoutUnit);
controller.start();
return controller;
} catch (Exception e) {
LOG.error("Failed to submit application {}", twillSpec.getName(), e);
throw Throwables.propagate(e);
}
}
private void setEnv(String runnableName, Map<String, String> env, boolean overwrite) {
Map<String, String> environment = environments.get(runnableName);
if (environment == null) {
environment = new LinkedHashMap<>(env);
environments.put(runnableName, environment);
return;
}
for (Map.Entry<String, String> entry : env.entrySet()) {
if (overwrite || !environment.containsKey(entry.getKey())) {
environment.put(entry.getKey(), entry.getValue());
}
}
}
private void saveLogLevels(String runnableName, Map<String, LogEntry.Level> logLevels) {
Map<String, String> newLevels = new HashMap<>();
for (Map.Entry<String, LogEntry.Level> entry : logLevels.entrySet()) {
Preconditions.checkArgument(entry.getValue() != null, "Log level cannot be null for logger {}", entry.getKey());
newLevels.put(entry.getKey(), entry.getValue().name());
}
this.logLevels.put(runnableName, newLevels);
}
private Credentials createCredentials() {
Credentials credentials = new Credentials();
try {
credentials.addAll(UserGroupInformation.getCurrentUser().getCredentials());
List<Token<?>> tokens = YarnUtils.addDelegationTokens(yarnConfig, appLocation.getLocationFactory(), credentials);
if (LOG.isDebugEnabled()) {
for (Token<?> token : tokens) {
LOG.debug("Delegation token acquired for {}, {}", appLocation, token);
}
}
} catch (IOException e) {
LOG.warn("Failed to check for secure login type. Not gathering any delegation token.", e);
}
return credentials;
}
private LocalFile createLocalFile(String name, Location location) throws IOException {
return createLocalFile(name, location, false);
}
private LocalFile createLocalFile(String name, Location location, boolean archive) throws IOException {
return new DefaultLocalFile(name, location.toURI(), location.lastModified(), location.length(), archive, null);
}
private void createTwillJar(final ApplicationBundler bundler, Map<String, LocalFile> localFiles) throws IOException {
LOG.debug("Create and copy {}", Constants.Files.TWILL_JAR);
Location location = locationCache.get(Constants.Files.TWILL_JAR, new LocationCache.Loader() {
@Override
public void load(String name, Location targetLocation) throws IOException {
// Stuck in the yarnAppClient class to make bundler being able to pickup the right yarn-client version
bundler.createBundle(targetLocation, ApplicationMasterMain.class,
yarnAppClient.getClass(), TwillContainerMain.class, OptionSpec.class);
}
});
LOG.debug("Done {}", Constants.Files.TWILL_JAR);
localFiles.put(Constants.Files.TWILL_JAR, createLocalFile(Constants.Files.TWILL_JAR, location, true));
}
private void createApplicationJar(final ApplicationBundler bundler,
Map<String, LocalFile> localFiles) throws IOException {
try {
final Set<Class<?>> classes = Sets.newIdentityHashSet();
classes.addAll(dependencies);
ClassLoader classLoader = getClassLoader();
for (RuntimeSpecification spec : twillSpec.getRunnables().values()) {
classes.add(classLoader.loadClass(spec.getRunnableSpecification().getClassName()));
}
// Add the TwillRunnableEventHandler class
if (twillSpec.getEventHandler() != null) {
classes.add(getClassLoader().loadClass(twillSpec.getEventHandler().getClassName()));
}
// The location name is computed from the MD5 of all the classes names
// The localized name is always APPLICATION_JAR
List<String> classList = Lists.newArrayList(Iterables.transform(classes, CLASS_TO_NAME));
Collections.sort(classList);
Hasher hasher = Hashing.md5().newHasher();
for (String name : classList) {
hasher.putString(name);
}
// Only depends on class list so that it can be reused across different launches
String name = hasher.hash().toString() + "-" + Constants.Files.APPLICATION_JAR;
LOG.debug("Create and copy {}", Constants.Files.APPLICATION_JAR);
Location location = locationCache.get(name, new LocationCache.Loader() {
@Override
public void load(String name, Location targetLocation) throws IOException {
bundler.createBundle(targetLocation, classes);
}
});
LOG.debug("Done {}", Constants.Files.APPLICATION_JAR);
localFiles.put(Constants.Files.APPLICATION_JAR, createLocalFile(Constants.Files.APPLICATION_JAR, location, true));
} catch (ClassNotFoundException e) {
throw Throwables.propagate(e);
}
}
private void createResourcesJar(ApplicationBundler bundler, Map<String, LocalFile> localFiles) throws IOException {
// If there is no resources, no need to create the jar file.
if (resources.isEmpty()) {
return;
}
LOG.debug("Create and copy {}", Constants.Files.RESOURCES_JAR);
Location location = createTempLocation(Constants.Files.RESOURCES_JAR);
bundler.createBundle(location, Collections.<Class<?>>emptyList(), resources);
LOG.debug("Done {}", Constants.Files.RESOURCES_JAR);
localFiles.put(Constants.Files.RESOURCES_JAR, createLocalFile(Constants.Files.RESOURCES_JAR, location, true));
}
private void createRuntimeConfigJar(Path dir, Map<String, LocalFile> localFiles) throws IOException {
LOG.debug("Create and copy {}", Constants.Files.RUNTIME_CONFIG_JAR);
// Jar everything under the given directory, which contains different files needed by AM/runnable containers
Location location = createTempLocation(Constants.Files.RUNTIME_CONFIG_JAR);
try (
JarOutputStream jarOutput = new JarOutputStream(location.getOutputStream());
DirectoryStream<Path> stream = Files.newDirectoryStream(dir)
) {
for (Path path : stream) {
jarOutput.putNextEntry(new JarEntry(path.getFileName().toString()));
Files.copy(path, jarOutput);
jarOutput.closeEntry();
}
}
LOG.debug("Done {}", Constants.Files.RUNTIME_CONFIG_JAR);
localFiles.put(Constants.Files.RUNTIME_CONFIG_JAR,
createLocalFile(Constants.Files.RUNTIME_CONFIG_JAR, location, true));
}
/**
* Based on the given {@link TwillSpecification}, upload LocalFiles to Yarn Cluster.
* @param twillSpec The {@link TwillSpecification} for populating resource.
*/
private Multimap<String, LocalFile> populateRunnableLocalFiles(TwillSpecification twillSpec) throws IOException {
Multimap<String, LocalFile> localFiles = HashMultimap.create();
LOG.debug("Populating Runnable LocalFiles");
for (Map.Entry<String, RuntimeSpecification> entry: twillSpec.getRunnables().entrySet()) {
String runnableName = entry.getKey();
for (LocalFile localFile : entry.getValue().getLocalFiles()) {
Location location;
URI uri = localFile.getURI();
if (appLocation.toURI().getScheme().equals(uri.getScheme())) {
// If the source file location is having the same scheme as the target location, no need to copy
location = appLocation.getLocationFactory().create(uri);
} else {
URL url = uri.toURL();
LOG.debug("Create and copy {} : {}", runnableName, url);
// Preserves original suffix for expansion.
location = copyFromURL(url, createTempLocation(Paths.addExtension(url.getFile(), localFile.getName())));
LOG.debug("Done {} : {}", runnableName, url);
}
localFiles.put(runnableName,
new DefaultLocalFile(localFile.getName(), location.toURI(), location.lastModified(),
location.length(), localFile.isArchive(), localFile.getPattern()));
}
}
LOG.debug("Done Runnable LocalFiles");
return localFiles;
}
private void saveSpecification(TwillSpecification spec, Path targetFile) throws IOException {
final Multimap<String, LocalFile> runnableLocalFiles = populateRunnableLocalFiles(spec);
// Rewrite LocalFiles inside twillSpec
Map<String, RuntimeSpecification> runtimeSpec = Maps.transformEntries(
spec.getRunnables(), new Maps.EntryTransformer<String, RuntimeSpecification, RuntimeSpecification>() {
@Override
public RuntimeSpecification transformEntry(String key, RuntimeSpecification value) {
return new DefaultRuntimeSpecification(value.getName(), value.getRunnableSpecification(),
value.getResourceSpecification(), runnableLocalFiles.get(key));
}
});
// Serialize into a local temp file.
LOG.debug("Creating {}", targetFile);
try (Writer writer = Files.newBufferedWriter(targetFile, StandardCharsets.UTF_8)) {
EventHandlerSpecification eventHandler = spec.getEventHandler();
if (eventHandler == null) {
eventHandler = new LogOnlyEventHandler().configure();
}
TwillSpecification newTwillSpec = new DefaultTwillSpecification(spec.getName(), runtimeSpec, spec.getOrders(),
spec.getPlacementPolicies(), eventHandler);
TwillRuntimeSpecificationAdapter.create().toJson(
new TwillRuntimeSpecification(newTwillSpec, appLocation.getLocationFactory().getHomeLocation().getName(),
appLocation.toURI(), zkConnectString, runId, twillSpec.getName(),
reservedMemory, yarnConfig.get(YarnConfiguration.RM_SCHEDULER_ADDRESS),
logLevels, maxRetries, minHeapRatio), writer);
}
LOG.debug("Done {}", targetFile);
}
private void saveLogback(Path targetFile) throws IOException {
URL url = getClass().getClassLoader().getResource(Constants.Files.LOGBACK_TEMPLATE);
if (url == null) {
return;
}
LOG.debug("Creating {}", targetFile);
try (InputStream is = url.openStream()) {
Files.copy(is, targetFile);
}
LOG.debug("Done {}", targetFile);
}
/**
* Creates the launcher.jar for launch the main application.
*/
private void createLauncherJar(Map<String, LocalFile> localFiles) throws URISyntaxException, IOException {
LOG.debug("Create and copy {}", Constants.Files.LAUNCHER_JAR);
Location location = locationCache.get(Constants.Files.LAUNCHER_JAR, new LocationCache.Loader() {
@Override
public void load(String name, Location targetLocation) throws IOException {
// Create a jar file with the TwillLauncher and FindFreePort and dependent classes inside.
try (JarOutputStream jarOut = new JarOutputStream(targetLocation.getOutputStream())) {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
if (classLoader == null) {
classLoader = getClass().getClassLoader();
}
Dependencies.findClassDependencies(classLoader, new ClassAcceptor() {
@Override
public boolean accept(String className, URL classUrl, URL classPathUrl) {
try {
jarOut.putNextEntry(new JarEntry(className.replace('.', '/') + ".class"));
try (InputStream is = classUrl.openStream()) {
ByteStreams.copy(is, jarOut);
}
} catch (IOException e) {
throw Throwables.propagate(e);
}
return true;
}
}, TwillLauncher.class.getName(), FindFreePort.class.getName());
}
}
});
LOG.debug("Done {}", Constants.Files.LAUNCHER_JAR);
localFiles.put(Constants.Files.LAUNCHER_JAR, createLocalFile(Constants.Files.LAUNCHER_JAR, location));
}
private void saveClassPaths(Path targetDir) throws IOException {
Files.write(targetDir.resolve(Constants.Files.APPLICATION_CLASSPATH),
Joiner.on(':').join(applicationClassPaths).getBytes(StandardCharsets.UTF_8));
Files.write(targetDir.resolve(Constants.Files.CLASSPATH),
Joiner.on(':').join(classPaths).getBytes(StandardCharsets.UTF_8));
}
private void saveJvmOptions(final Path targetPath) throws IOException {
if ((extraOptions == null || extraOptions.isEmpty()) &&
JvmOptions.DebugOptions.NO_DEBUG.equals(debugOptions)) {
// If no vm options, no need to localize the file.
return;
}
LOG.debug("Creating {}", targetPath);
JvmOptionsCodec.encode(new JvmOptions(extraOptions, debugOptions), new OutputSupplier<Writer>() {
@Override
public Writer getOutput() throws IOException {
return Files.newBufferedWriter(targetPath, StandardCharsets.UTF_8);
}
});
LOG.debug("Done {}", targetPath);
}
private void saveArguments(Arguments arguments, final Path targetPath) throws IOException {
LOG.debug("Creating {}", targetPath);
ArgumentsCodec.encode(arguments, new OutputSupplier<Writer>() {
@Override
public Writer getOutput() throws IOException {
return Files.newBufferedWriter(targetPath, StandardCharsets.UTF_8);
}
});
LOG.debug("Done {}", targetPath);
}
private void saveEnvironments(Path targetPath) throws IOException {
if (environments.isEmpty()) {
return;
}
LOG.debug("Creating {}", targetPath);
try (Writer writer = Files.newBufferedWriter(targetPath, StandardCharsets.UTF_8)) {
new Gson().toJson(environments, writer);
}
LOG.debug("Done {}", targetPath);
}
/**
* Serializes the information for files that are localized to all YARN containers.
*/
private void createLocalizeFilesJson(Map<String, LocalFile> localFiles) throws IOException {
LOG.debug("Create and copy {}", Constants.Files.LOCALIZE_FILES);
Location location = createTempLocation(Constants.Files.LOCALIZE_FILES);
// Serialize the list of LocalFiles, except the one we are generating here, as this file is used by AM only.
// This file should never use LocationCache.
try (Writer writer = new OutputStreamWriter(location.getOutputStream(), StandardCharsets.UTF_8)) {
new GsonBuilder().registerTypeAdapter(LocalFile.class, new LocalFileCodec())
.create().toJson(localFiles.values(), new TypeToken<List<LocalFile>>() {
}.getType(), writer);
}
LOG.debug("Done {}", Constants.Files.LOCALIZE_FILES);
localFiles.put(Constants.Files.LOCALIZE_FILES, createLocalFile(Constants.Files.LOCALIZE_FILES, location));
}
private Location copyFromURL(URL url, Location target) throws IOException {
try (
InputStream is = url.openStream();
OutputStream os = new BufferedOutputStream(target.getOutputStream())
) {
ByteStreams.copy(is, os);
}
return target;
}
private Location createTempLocation(String fileName) {
String name;
String suffix = Paths.getExtension(fileName);
name = fileName.substring(0, fileName.length() - suffix.length() - 1);
try {
return appLocation.append(name).getTempFile('.' + suffix);
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
/**
* Returns the context ClassLoader if there is any, otherwise, returns ClassLoader of this class.
*/
private ClassLoader getClassLoader() {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
return classLoader == null ? getClass().getClassLoader() : classLoader;
}
private ApplicationBundler createBundler(ClassAcceptor classAcceptor) {
return new ApplicationBundler(classAcceptor).setTempDir(localStagingDir);
}
/**
* Creates a {@link ApplicationBundler} for building application jar. The bundler will include classes
* accepted by the given {@link ClassAcceptor}, as long as it is not a twill class.
*/
private ApplicationBundler createApplicationJarBundler(final ClassAcceptor classAcceptor) {
// Accept classes based on the classAcceptor and also excluding all twill classes as they are already
// in the twill.jar
return createBundler(new ClassAcceptor() {
@Override
public boolean accept(String className, URL classUrl, URL classPathUrl) {
return !twillClassPaths.contains(classPathUrl) && classAcceptor.accept(className, classUrl, classPathUrl);
}
});
}
}