blob: 7d05a76d73c99f83abb2c6f5d336853df72b0257 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.apex;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Attribute.AttributeMap;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Serializable;
import java.lang.reflect.AccessibleObject;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.jar.JarFile;
import java.util.jar.Manifest;
import org.apache.apex.api.EmbeddedAppLauncher;
import org.apache.apex.api.Launcher;
import org.apache.apex.api.Launcher.AppHandle;
import org.apache.apex.api.Launcher.LaunchMode;
import org.apache.apex.api.Launcher.LauncherException;
import org.apache.apex.api.Launcher.ShutdownMode;
import org.apache.apex.api.YarnAppLauncher;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Proxy to launch the YARN application through the hadoop script to run in the pre-configured
* environment (class path, configuration, native libraries etc.).
*
* <p>The proxy takes the DAG and communicates with the Hadoop services to launch it on the cluster.
*/
public class ApexYarnLauncher {
private static final Logger LOG = LoggerFactory.getLogger(ApexYarnLauncher.class);
public AppHandle launchApp(StreamingApplication app, Properties configProperties)
throws IOException {
List<File> jarsToShip = getYarnDeployDependencies();
StringBuilder classpath = new StringBuilder();
for (File path : jarsToShip) {
if (path.isDirectory()) {
File tmpJar = File.createTempFile("beam-runners-apex-", ".jar");
createJar(path, tmpJar);
tmpJar.deleteOnExit();
path = tmpJar;
}
if (classpath.length() != 0) {
classpath.append(':');
}
classpath.append(path.getAbsolutePath());
}
EmbeddedAppLauncher<?> embeddedLauncher = Launcher.getLauncher(LaunchMode.EMBEDDED);
DAG dag = embeddedLauncher.getDAG();
app.populateDAG(dag, new Configuration(false));
Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
launchAttributes.put(YarnAppLauncher.LIB_JARS, classpath.toString().replace(':', ','));
LaunchParams lp = new LaunchParams(dag, launchAttributes, configProperties);
lp.cmd = "hadoop " + ApexYarnLauncher.class.getName();
HashMap<String, String> env = new HashMap<>();
env.put("HADOOP_USER_CLASSPATH_FIRST", "1");
env.put("HADOOP_CLASSPATH", classpath.toString());
lp.env = env;
return launchApp(lp);
}
protected AppHandle launchApp(LaunchParams params) throws IOException {
File tmpFile = File.createTempFile("beam-runner-apex", "params");
tmpFile.deleteOnExit();
try (FileOutputStream fos = new FileOutputStream(tmpFile)) {
SerializationUtils.serialize(params, fos);
}
if (params.getCmd() == null) {
ApexYarnLauncher.main(new String[] {tmpFile.getAbsolutePath()});
} else {
String cmd = params.getCmd() + " " + tmpFile.getAbsolutePath();
ByteArrayOutputStream consoleOutput = new ByteArrayOutputStream();
LOG.info("Executing: {} with {}", cmd, params.getEnv());
ProcessBuilder pb = new ProcessBuilder("bash", "-c", cmd);
Map<String, String> env = pb.environment();
env.putAll(params.getEnv());
Process p = pb.start();
ProcessWatcher pw = new ProcessWatcher(p);
InputStream output = p.getInputStream();
InputStream error = p.getErrorStream();
while (!pw.isFinished()) {
IOUtils.copy(output, consoleOutput);
IOUtils.copy(error, consoleOutput);
}
if (pw.rc != 0) {
String msg =
"The Beam Apex runner in non-embedded mode requires the Hadoop client"
+ " to be installed on the machine from which you launch the job"
+ " and the 'hadoop' script in $PATH";
LOG.error(msg);
throw new RuntimeException(
"Failed to run: "
+ cmd
+ " (exit code "
+ pw.rc
+ ")"
+ "\n"
+ consoleOutput.toString());
}
}
return new AppHandle() {
@Override
public boolean isFinished() {
// TODO (future PR): interaction with child process
LOG.warn("YARN application runs asynchronously and status check not implemented.");
return true;
}
@Override
public void shutdown(ShutdownMode arg0) throws LauncherException {
// TODO (future PR): interaction with child process
throw new UnsupportedOperationException();
}
};
}
/**
* From the current classpath, find the jar files that need to be deployed with the application to
* run on YARN. Hadoop dependencies are provided through the Hadoop installation and the
* application should not bundle them to avoid conflicts. This is done by removing the Hadoop
* compile dependencies (transitively) by parsing the Maven dependency tree.
*
* @return list of jar files to ship
* @throws IOException when dependency information cannot be read
*/
public static List<File> getYarnDeployDependencies() throws IOException {
try (InputStream dependencyTree = ApexRunner.class.getResourceAsStream("dependency-tree")) {
try (BufferedReader br =
new BufferedReader(new InputStreamReader(dependencyTree, StandardCharsets.UTF_8))) {
String line;
List<String> excludes = new ArrayList<>();
int excludeLevel = Integer.MAX_VALUE;
while ((line = br.readLine()) != null) {
for (int i = 0; i < line.length(); i++) {
char c = line.charAt(i);
if (Character.isLetter(c)) {
if (i > excludeLevel) {
excludes.add(line.substring(i));
} else {
if (line.substring(i).startsWith("org.apache.hadoop")) {
excludeLevel = i;
excludes.add(line.substring(i));
} else {
excludeLevel = Integer.MAX_VALUE;
}
}
break;
}
}
}
Set<String> excludeJarFileNames = Sets.newHashSet();
for (String exclude : excludes) {
List<String> strings = Splitter.on(':').splitToList(exclude);
String[] mvnc = strings.toArray(new String[strings.size()]);
String fileName = mvnc[1] + "-";
if (mvnc.length == 6) {
fileName += mvnc[4] + "-" + mvnc[3]; // with classifier
} else {
fileName += mvnc[3];
}
fileName += ".jar";
excludeJarFileNames.add(fileName);
}
ClassLoader classLoader = ApexYarnLauncher.class.getClassLoader();
URL[] urls = ((URLClassLoader) classLoader).getURLs();
List<File> dependencyJars = new ArrayList<>();
for (URL url : urls) {
File f = new File(url.getFile());
// dependencies can also be directories in the build reactor,
// the Apex client will automatically create jar files for those.
if (f.exists() && !excludeJarFileNames.contains(f.getName())) {
dependencyJars.add(f);
}
}
return dependencyJars;
}
}
}
/**
* Create a jar file from the given directory.
*
* @param dir source directory
* @param jarFile jar file name
* @throws IOException when file cannot be created
*/
public static void createJar(File dir, File jarFile) throws IOException {
final Map<String, ?> env = Collections.singletonMap("create", "true");
if (jarFile.exists() && !jarFile.delete()) {
throw new RuntimeException("Failed to remove " + jarFile);
}
URI uri = URI.create("jar:" + jarFile.toURI());
try (final FileSystem zipfs = FileSystems.newFileSystem(uri, env)) {
File manifestFile = new File(dir, JarFile.MANIFEST_NAME);
Files.createDirectory(zipfs.getPath("META-INF"));
try (final OutputStream out = Files.newOutputStream(zipfs.getPath(JarFile.MANIFEST_NAME))) {
if (!manifestFile.exists()) {
new Manifest().write(out);
} else {
Files.copy(manifestFile.toPath(), out);
}
}
final Path root = dir.toPath();
Files.walkFileTree(
root,
new java.nio.file.SimpleFileVisitor<Path>() {
String relativePath;
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
throws IOException {
relativePath = root.relativize(dir).toString();
if (!relativePath.isEmpty()) {
if (!relativePath.endsWith("/")) {
relativePath += "/";
}
if (!"META-INF/".equals(relativePath)) {
final Path dstDir = zipfs.getPath(relativePath);
Files.createDirectory(dstDir);
}
}
return super.preVisitDirectory(dir, attrs);
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs)
throws IOException {
String name = relativePath + file.getFileName();
if (!JarFile.MANIFEST_NAME.equals(name)) {
try (final OutputStream out = Files.newOutputStream(zipfs.getPath(name))) {
Files.copy(file, out);
}
}
return super.visitFile(file, attrs);
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc)
throws IOException {
relativePath = root.relativize(dir.getParent()).toString();
if (!relativePath.isEmpty() && !relativePath.endsWith("/")) {
relativePath += "/";
}
return super.postVisitDirectory(dir, exc);
}
});
}
}
/** Transfer the properties to the configuration object. */
public static void addProperties(Configuration conf, Properties props) {
for (final String propertyName : props.stringPropertyNames()) {
String propertyValue = props.getProperty(propertyName);
conf.set(propertyName, propertyValue);
}
}
/**
* The main method expects the serialized DAG and will launch the YARN application.
*
* @param args location of launch parameters
* @throws IOException when parameters cannot be read
*/
public static void main(String[] args) throws IOException {
checkArgument(args.length == 1, "exactly one argument expected");
File file = new File(args[0]);
checkArgument(file.exists() && file.isFile(), "invalid file path %s", file);
final LaunchParams params = SerializationUtils.deserialize(new FileInputStream(file));
StreamingApplication apexApp = (dag, conf) -> copyShallow(params.dag, dag);
Configuration conf = new Configuration(); // configuration from Hadoop client
addProperties(conf, params.configProperties);
AppHandle appHandle =
params.getApexLauncher().launchApp(apexApp, conf, params.launchAttributes);
if (appHandle == null) {
throw new AssertionError("Launch returns null handle.");
}
// TODO (future PR)
// At this point the application is running, but this process should remain active to
// allow the parent to implement the runner result.
}
/** Launch parameters that will be serialized and passed to the child process. */
@VisibleForTesting
protected static class LaunchParams implements Serializable {
private static final long serialVersionUID = 1L;
private final DAG dag;
private final Attribute.AttributeMap launchAttributes;
private final Properties configProperties;
private HashMap<String, String> env;
private String cmd;
protected LaunchParams(DAG dag, AttributeMap launchAttributes, Properties configProperties) {
this.dag = dag;
this.launchAttributes = launchAttributes;
this.configProperties = configProperties;
}
protected Launcher<?> getApexLauncher() {
return Launcher.getLauncher(LaunchMode.YARN);
}
protected String getCmd() {
return cmd;
}
protected Map<String, String> getEnv() {
return env;
}
}
private static void copyShallow(DAG from, DAG to) {
checkArgument(
from.getClass() == to.getClass(),
"must be same class %s %s",
from.getClass(),
to.getClass());
Field[] fields = from.getClass().getDeclaredFields();
AccessibleObject.setAccessible(fields, true);
for (Field field : fields) {
if (!Modifier.isStatic(field.getModifiers())) {
try {
field.set(to, field.get(from));
} catch (IllegalArgumentException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
}
}
/** Starts a command and waits for it to complete. */
public static class ProcessWatcher implements Runnable {
private final Process p;
private volatile boolean finished = false;
private volatile int rc;
public ProcessWatcher(Process p) {
this.p = p;
new Thread(this).start();
}
public boolean isFinished() {
return finished;
}
@Override
public void run() {
try {
rc = p.waitFor();
} catch (Exception e) {
// ignore
}
finished = true;
}
}
}