blob: aa038c12356393f1fa17b268e8d1e944c36ebb18 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client.program;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.Random;
import java.util.jar.Attributes;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.jar.Manifest;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.Program;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.compiler.PactCompiler;
import org.apache.flink.compiler.dag.DataSinkNode;
import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
/**
* This class encapsulates represents a program, packaged in a jar file. It supplies
* functionality to extract nested libraries, search for the program entry point, and extract
* a program plan.
*/
public class PackagedProgram {
/**
* Property name of the entry in JAR manifest file that describes the Flink specific entry point.
*/
public static final String MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS = "program-class";
/**
* Property name of the entry in JAR manifest file that describes the class with the main method.
*/
public static final String MANIFEST_ATTRIBUTE_MAIN_CLASS = "Main-Class";
// --------------------------------------------------------------------------------------------
private final File jarFile;
private final String[] args;
private final Program program;
private final Class<?> mainClass;
private final List<File> extractedTempLibraries;
private ClassLoader userCodeClassLoader;
private Plan plan;
/**
* Creates an instance that wraps the plan defined in the jar file using the given
* argument.
*
* @param jarFile
* The jar file which contains the plan and a Manifest which defines
* the program-class
* @param args
* Optional. The arguments used to create the pact plan, depend on
* implementation of the pact plan. See getDescription().
* @throws ProgramInvocationException
* This invocation is thrown if the Program can't be properly loaded. Causes
* may be a missing / wrong class or manifest files.
*/
public PackagedProgram(File jarFile, String... args) throws ProgramInvocationException {
this(jarFile, null, args);
}
/**
* Creates an instance that wraps the plan defined in the jar file using the given
* arguments. For generating the plan the class defined in the className parameter
* is used.
*
* @param jarFile
* The jar file which contains the plan.
* @param entryPointClassName
* Name of the class which generates the plan. Overrides the class defined
* in the jar file manifest
* @param args
* Optional. The arguments used to create the pact plan, depend on
* implementation of the pact plan. See getDescription().
* @throws ProgramInvocationException
* This invocation is thrown if the Program can't be properly loaded. Causes
* may be a missing / wrong class or manifest files.
*/
public PackagedProgram(File jarFile, String entryPointClassName, String... args) throws ProgramInvocationException {
if (jarFile == null) {
throw new IllegalArgumentException("The jar file must not be null.");
}
checkJarFile(jarFile);
this.jarFile = jarFile;
this.args = args == null ? new String[0] : args;
// if no entryPointClassName name was given, we try and look one up through the manifest
if (entryPointClassName == null) {
entryPointClassName = getEntryPointClassNameFromJar(jarFile);
}
// now that we have an entry point, we can extract the nested jar files (if any)
this.extractedTempLibraries = extractContainedLibaries(jarFile);
this.userCodeClassLoader = buildUserCodeClassLoader(jarFile, extractedTempLibraries, getClass().getClassLoader());
// load the entry point class
this.mainClass = loadMainClass(entryPointClassName, userCodeClassLoader);
// if the entry point is a program, instantiate the class and get the plan
if (Program.class.isAssignableFrom(this.mainClass)) {
Program prg = null;
try {
prg = InstantiationUtil.instantiate(this.mainClass.asSubclass(Program.class), Program.class);
} catch (Exception e) {
// validate that the class has a main method at least.
// the main method possibly instantiates the program properly
if (!hasMainMethod(mainClass)) {
throw new ProgramInvocationException("The given program class implements the " +
Program.class.getName() + " interface, but cannot be instantiated. " +
"It also declares no main(String[]) method as alternative entry point", e);
}
} catch (Throwable t) {
throw new ProgramInvocationException("Error while trying to instantiate program class.", t);
}
this.program = prg;
} else if (hasMainMethod(mainClass)) {
this.program = null;
} else {
throw new ProgramInvocationException("The given program class neither has a main(String[]) method, nor does it implement the " +
Program.class.getName() + " interface.");
}
}
public String[] getArguments() {
return this.args;
}
public String getMainClassName() {
return this.mainClass.getName();
}
public boolean isUsingInteractiveMode() {
return this.program == null;
}
public boolean isUsingProgramEntryPoint() {
return this.program != null;
}
/**
* Returns the plan with all required jars.
*
* @return The plan with attached jar files.
* @throws ProgramInvocationException
*/
public JobWithJars getPlanWithJars() throws ProgramInvocationException {
if (isUsingProgramEntryPoint()) {
List<File> allJars = new ArrayList<File>();
allJars.add(jarFile);
allJars.addAll(extractedTempLibraries);
return new JobWithJars(getPlan(), allJars, userCodeClassLoader);
} else {
throw new ProgramInvocationException("Cannot create a " + JobWithJars.class.getSimpleName() +
" for a program that is using the interactive mode.");
}
}
/**
* Returns the analyzed plan without any optimizations.
*
* @return
* the analyzed plan without any optimizations.
* @throws ProgramInvocationException Thrown if an error occurred in the
* user-provided pact assembler. This may indicate
* missing parameters for generation.
*/
public String getPreviewPlan() throws ProgramInvocationException {
Thread.currentThread().setContextClassLoader(this.getUserCodeClassLoader());
List<DataSinkNode> previewPlan;
if (isUsingProgramEntryPoint()) {
previewPlan = PactCompiler.createPreOptimizedPlan(getPlan());
}
else if (isUsingInteractiveMode()) {
// temporary hack to support the web client
PreviewPlanEnvironment env = new PreviewPlanEnvironment();
env.setAsContext();
try {
ContextEnvironment.disableLocalExecution();
invokeInteractiveModeForExecution();
}
catch (ProgramInvocationException e) {
throw e;
}
catch (Throwable t) {
// the invocation gets aborted with the preview plan
if (env.previewPlan != null) {
previewPlan = env.previewPlan;
} else {
throw new ProgramInvocationException("The program caused an error: ", t);
}
}
if (env.previewPlan != null) {
previewPlan = env.previewPlan;
} else {
throw new ProgramInvocationException(
"The program plan could not be fetched. The program silently swallowed the control flow exceptions.");
}
}
else {
throw new RuntimeException();
}
PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
StringWriter string = new StringWriter(1024);
PrintWriter pw = null;
try {
pw = new PrintWriter(string);
jsonGen.dumpPactPlanAsJSON(previewPlan, pw);
} finally {
pw.close();
}
return string.toString();
}
/**
* Returns the description provided by the Program class. This
* may contain a description of the plan itself and its arguments.
*
* @return The description of the PactProgram's input parameters.
* @throws ProgramInvocationException
* This invocation is thrown if the Program can't be properly loaded. Causes
* may be a missing / wrong class or manifest files.
*/
public String getDescription() throws ProgramInvocationException {
if (ProgramDescription.class.isAssignableFrom(this.mainClass)) {
ProgramDescription descr;
if (this.program != null) {
descr = (ProgramDescription) this.program;
} else {
try {
descr = InstantiationUtil.instantiate(
this.mainClass.asSubclass(ProgramDescription.class), ProgramDescription.class);
} catch (Throwable t) {
return null;
}
}
try {
return descr.getDescription();
}
catch (Throwable t) {
throw new ProgramInvocationException("Error while getting the program description" +
(t.getMessage() == null ? "." : ": " + t.getMessage()), t);
}
} else {
return null;
}
}
/**
*
* This method assumes that the context environment is prepared, or the execution
* will be a local execution by default.
*/
public void invokeInteractiveModeForExecution() throws ProgramInvocationException{
if (isUsingInteractiveMode()) {
callMainMethod(mainClass, args);
} else {
throw new ProgramInvocationException("Cannot invoke a plan-based program directly.");
}
}
/**
* Gets the {@link java.lang.ClassLoader} that must be used to load user code classes.
*
* @return The user code ClassLoader.
*/
public ClassLoader getUserCodeClassLoader() {
return this.userCodeClassLoader;
}
public List<File> getAllLibraries() {
List<File> libs = new ArrayList<File>(this.extractedTempLibraries.size() + 1);
libs.add(jarFile);
libs.addAll(this.extractedTempLibraries);
return libs;
}
/**
* Deletes all temporary files created for contained packaged libraries.
*/
public void deleteExtractedLibraries() {
deleteExtractedLibraries(this.extractedTempLibraries);
this.extractedTempLibraries.clear();
}
/**
* Returns the plan as generated from the Pact Assembler.
*
* @return The program's plan.
* @throws ProgramInvocationException Thrown, if an error occurred in the program while
* creating the program's {@link Plan}.
*/
private Plan getPlan() throws ProgramInvocationException {
if (this.plan == null) {
Thread.currentThread().setContextClassLoader(this.userCodeClassLoader);
this.plan = createPlanFromProgram(this.program, this.args);
}
return this.plan;
}
private static boolean hasMainMethod(Class<?> entryClass) {
Method mainMethod;
try {
mainMethod = entryClass.getMethod("main", String[].class);
} catch (NoSuchMethodException e) {
return false;
}
catch (Throwable t) {
throw new RuntimeException("Could not look up the main(String[]) method from the class " +
entryClass.getName() + ": " + t.getMessage(), t);
}
return Modifier.isStatic(mainMethod.getModifiers()) && Modifier.isPublic(mainMethod.getModifiers());
}
private static void callMainMethod(Class<?> entryClass, String[] args) throws ProgramInvocationException {
Method mainMethod;
try {
mainMethod = entryClass.getMethod("main", String[].class);
} catch (NoSuchMethodException e) {
throw new ProgramInvocationException("The class " + entryClass.getName() + " has no main(String[]) method.");
}
catch (Throwable t) {
throw new ProgramInvocationException("Could not look up the main(String[]) method from the class " +
entryClass.getName() + ": " + t.getMessage(), t);
}
if (!Modifier.isStatic(mainMethod.getModifiers())) {
throw new ProgramInvocationException("The class " + entryClass.getName() + " declares a non-static main method.");
}
if (!Modifier.isPublic(mainMethod.getModifiers())) {
throw new ProgramInvocationException("The class " + entryClass.getName() + " declares a non-public main method.");
}
try {
mainMethod.invoke(null, (Object) args);
}
catch (IllegalArgumentException e) {
throw new ProgramInvocationException("Could not invoke the main method, arguments are not matching.", e);
}
catch (IllegalAccessException e) {
throw new ProgramInvocationException("Access to the main method was denied: " + e.getMessage(), e);
}
catch (InvocationTargetException e) {
Throwable exceptionInMethod = e.getTargetException();
if (exceptionInMethod instanceof Error) {
throw (Error) exceptionInMethod;
} else if (exceptionInMethod instanceof ProgramInvocationException) {
throw (ProgramInvocationException) exceptionInMethod;
} else {
throw new ProgramInvocationException("The main method caused an error.", exceptionInMethod);
}
}
catch (Throwable t) {
throw new ProgramInvocationException("An error occurred while invoking the program's main method: " + t.getMessage(), t);
}
}
private static String getEntryPointClassNameFromJar(File jarFile) throws ProgramInvocationException {
JarFile jar = null;
Manifest manifest = null;
String className = null;
// Open jar file
try {
jar = new JarFile(jarFile);
} catch (IOException ioex) {
throw new ProgramInvocationException("Error while opening jar file '" + jarFile.getPath() + "'. "
+ ioex.getMessage(), ioex);
}
// jar file must be closed at the end
try {
// Read from jar manifest
try {
manifest = jar.getManifest();
} catch (IOException ioex) {
throw new ProgramInvocationException("The Manifest in the jar file could not be accessed '"
+ jarFile.getPath() + "'. " + ioex.getMessage(), ioex);
}
if (manifest == null) {
throw new ProgramInvocationException("No manifest found in jar file '" + jarFile.getPath() + "'. The manifest is need to point to the program's main class.");
}
Attributes attributes = manifest.getMainAttributes();
// check for a "program-class" entry first
className = attributes.getValue(PackagedProgram.MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS);
if (className != null) {
return className;
}
// check for a main class
className = attributes.getValue(PackagedProgram.MANIFEST_ATTRIBUTE_MAIN_CLASS);
if (className != null) {
return className;
} else {
throw new ProgramInvocationException("Neither a '" + MANIFEST_ATTRIBUTE_MAIN_CLASS + "', nor a '" +
MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS + "' entry was found in the jar file.");
}
}
finally {
try {
jar.close();
} catch (Throwable t) {
throw new ProgramInvocationException("Could not close the JAR file: " + t.getMessage(), t);
}
}
}
private static Class<?> loadMainClass(String className, ClassLoader cl) throws ProgramInvocationException {
try {
return Class.forName(className, true, cl);
}
catch (ClassNotFoundException e) {
throw new ProgramInvocationException("The program's entry point class '" + className
+ "' was not found in the jar file.", e);
}
catch (ExceptionInInitializerError e) {
throw new ProgramInvocationException("The program's entry point class '" + className
+ "' threw an error during initialization.", e);
}
catch (LinkageError e) {
throw new ProgramInvocationException("The program's entry point class '" + className
+ "' could not be loaded due to a linkage failure.", e);
}
catch (Throwable t) {
throw new ProgramInvocationException("The program's entry point class '" + className
+ "' caused an exception during initialization: "+ t.getMessage(), t);
}
}
/**
* Takes the jar described by the given file and invokes its pact assembler class to
* assemble a plan. The assembler class name is either passed through a parameter,
* or it is read from the manifest of the jar. The assembler is handed the given options
* for its assembly.
*
* @param program The program to create the plan for.
* @param options
* The options for the assembler.
* @return The plan created by the program.
* @throws ProgramInvocationException
* Thrown, if an error occurred in the user-provided pact assembler.
*/
private static Plan createPlanFromProgram(Program program, String[] options) throws ProgramInvocationException {
try {
return program.getPlan(options);
} catch (Throwable t) {
throw new ProgramInvocationException("Error while calling the program: " + t.getMessage(), t);
}
}
/**
* Takes all JAR files that are contained in this program's JAR file and extracts them
* to the system's temp directory.
*
* @return The file names of the extracted temporary files.
* @throws ProgramInvocationException Thrown, if the extraction process failed.
*/
private static List<File> extractContainedLibaries(File jarFile) throws ProgramInvocationException {
Random rnd = new Random();
JarFile jar = null;
try {
jar = new JarFile(jarFile);
final List<JarEntry> containedJarFileEntries = new ArrayList<JarEntry>();
Enumeration<JarEntry> entries = jar.entries();
while (entries.hasMoreElements()) {
JarEntry entry = entries.nextElement();
String name = entry.getName();
if (name.length() > 8 && name.startsWith("lib/") && name.endsWith(".jar")) {
containedJarFileEntries.add(entry);
}
}
if (containedJarFileEntries.isEmpty()) {
return Collections.emptyList();
}
else {
// go over all contained jar files
final List<File> extractedTempLibraries = new ArrayList<File>(containedJarFileEntries.size());
final byte[] buffer = new byte[4096];
boolean incomplete = true;
try {
for (int i = 0; i < containedJarFileEntries.size(); i++) {
final JarEntry entry = containedJarFileEntries.get(i);
String name = entry.getName();
name = name.replace(File.separatorChar, '_');
File tempFile;
try {
tempFile = File.createTempFile(String.valueOf(Math.abs(rnd.nextInt()) + "_"), name);
tempFile.deleteOnExit();
}
catch (IOException e) {
throw new ProgramInvocationException(
"An I/O error occurred while creating temporary file to extract nested library '" +
entry.getName() + "'.", e);
}
extractedTempLibraries.add(tempFile);
// copy the temp file contents to a temporary File
OutputStream out = null;
InputStream in = null;
try {
out = new FileOutputStream(tempFile);
in = new BufferedInputStream(jar.getInputStream(entry));
int numRead = 0;
while ((numRead = in.read(buffer)) != -1) {
out.write(buffer, 0, numRead);
}
}
catch (IOException e) {
throw new ProgramInvocationException("An I/O error occurred while extracting nested library '"
+ entry.getName() + "' to temporary file '" + tempFile.getAbsolutePath() + "'.");
}
finally {
if (out != null) {
out.close();
}
if (in != null) {
in.close();
}
}
}
incomplete = false;
}
finally {
if (incomplete) {
deleteExtractedLibraries(extractedTempLibraries);
}
}
return extractedTempLibraries;
}
}
catch (Throwable t) {
throw new ProgramInvocationException("Unknown I/O error while extracting contained jar files.", t);
}
finally {
if (jar != null) {
try {
jar.close();
} catch (Throwable t) {}
}
}
}
private static void deleteExtractedLibraries(List<File> tempLibraries) {
for (File f : tempLibraries) {
f.delete();
}
}
private static ClassLoader buildUserCodeClassLoader(File mainJar, List<File> nestedJars, ClassLoader parent) throws ProgramInvocationException {
ArrayList<File> allJars = new ArrayList<File>(nestedJars.size() + 1);
allJars.add(mainJar);
allJars.addAll(nestedJars);
return JobWithJars.buildUserCodeClassLoader(allJars, parent);
}
private static void checkJarFile(File jarfile) throws ProgramInvocationException {
try {
JobWithJars.checkJarFile(jarfile);
}
catch (IOException e) {
throw new ProgramInvocationException(e.getMessage());
}
catch (Throwable t) {
throw new ProgramInvocationException("Cannot access jar file" + (t.getMessage() == null ? "." : ": " + t.getMessage()), t);
}
}
// --------------------------------------------------------------------------------------------
public static final class PreviewPlanEnvironment extends ExecutionEnvironment {
private List<DataSinkNode> previewPlan;
private Plan plan;
@Override
public JobExecutionResult execute(String jobName) throws Exception {
this.plan = createProgramPlan(jobName);
this.previewPlan = PactCompiler.createPreOptimizedPlan(plan);
// do not go on with anything now!
throw new Client.ProgramAbortException();
}
@Override
public String getExecutionPlan() throws Exception {
Plan plan = createProgramPlan("unused");
this.previewPlan = PactCompiler.createPreOptimizedPlan(plan);
// do not go on with anything now!
throw new Client.ProgramAbortException();
}
public void setAsContext() {
initializeContextEnvironment(this);
}
public Plan getPlan() {
return this.plan;
}
}
}