blob: 52eda29a7b972c53cfe35b09234522ce819ccba0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client;
import java.util.List;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.api.common.Program;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.util.LogUtils;
import org.apache.log4j.Level;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.minicluster.NepheleMiniCluster;
import org.apache.flink.compiler.DataStatistics;
import org.apache.flink.compiler.PactCompiler;
import org.apache.flink.compiler.contextcheck.ContextChecker;
import org.apache.flink.compiler.dag.DataSinkNode;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plandump.PlanJSONDumpGenerator;
import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
/**
* A class for executing a {@link Plan} on a local embedded Flink runtime instance.
*/
public class LocalExecutor extends PlanExecutor {
private static boolean DEFAULT_OVERWRITE = false;
private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = -1;
private final Object lock = new Object(); // we lock to ensure singleton execution
private NepheleMiniCluster nephele;
// ---------------------------------- config options ------------------------------------------
private int jobManagerRpcPort = -1;
private int taskManagerRpcPort = -1;
private int taskManagerDataPort = -1;
private int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;
private String configDir;
private String hdfsConfigFile;
private boolean defaultOverwriteFiles = DEFAULT_OVERWRITE;
private boolean defaultAlwaysCreateDirectory = false;
// --------------------------------------------------------------------------------------------
public LocalExecutor() {
if (!ExecutionEnvironment.localExecutionIsAllowed()) {
throw new InvalidProgramException("The LocalEnvironment cannot be used when submitting a program through a client.");
}
if (System.getProperty("log4j.configuration") == null) {
setLoggingLevel(Level.INFO);
}
}
public int getJobManagerRpcPort() {
return jobManagerRpcPort;
}
public void setJobManagerRpcPort(int jobManagerRpcPort) {
this.jobManagerRpcPort = jobManagerRpcPort;
}
public int getTaskManagerRpcPort() {
return taskManagerRpcPort;
}
public void setTaskManagerRpcPort(int taskManagerRpcPort) {
this.taskManagerRpcPort = taskManagerRpcPort;
}
public int getTaskManagerDataPort() {
return taskManagerDataPort;
}
public void setTaskManagerDataPort(int taskManagerDataPort) {
this.taskManagerDataPort = taskManagerDataPort;
}
public String getConfigDir() {
return configDir;
}
public void setConfigDir(String configDir) {
this.configDir = configDir;
}
public String getHdfsConfig() {
return hdfsConfigFile;
}
public void setHdfsConfig(String hdfsConfig) {
this.hdfsConfigFile = hdfsConfig;
}
public boolean isDefaultOverwriteFiles() {
return defaultOverwriteFiles;
}
public void setDefaultOverwriteFiles(boolean defaultOverwriteFiles) {
this.defaultOverwriteFiles = defaultOverwriteFiles;
}
public boolean isDefaultAlwaysCreateDirectory() {
return defaultAlwaysCreateDirectory;
}
public void setDefaultAlwaysCreateDirectory(boolean defaultAlwaysCreateDirectory) {
this.defaultAlwaysCreateDirectory = defaultAlwaysCreateDirectory;
}
public void setTaskManagerNumSlots(int taskManagerNumSlots) { this.taskManagerNumSlots = taskManagerNumSlots; }
public int getTaskManagerNumSlots() { return this.taskManagerNumSlots; }
// --------------------------------------------------------------------------------------------
public void start() throws Exception {
synchronized (this.lock) {
if (this.nephele == null) {
// create the embedded runtime
this.nephele = new NepheleMiniCluster();
// configure it, if values were changed. otherwise the embedded runtime uses the internal defaults
if (jobManagerRpcPort > 0) {
nephele.setJobManagerRpcPort(jobManagerRpcPort);
}
if (taskManagerRpcPort > 0) {
nephele.setTaskManagerRpcPort(jobManagerRpcPort);
}
if (taskManagerDataPort > 0) {
nephele.setTaskManagerDataPort(taskManagerDataPort);
}
if (configDir != null) {
nephele.setConfigDir(configDir);
}
if (hdfsConfigFile != null) {
nephele.setHdfsConfigFile(hdfsConfigFile);
}
nephele.setDefaultOverwriteFiles(defaultOverwriteFiles);
nephele.setDefaultAlwaysCreateDirectory(defaultAlwaysCreateDirectory);
nephele.setTaskManagerNumSlots(taskManagerNumSlots);
// start it up
this.nephele.start();
} else {
throw new IllegalStateException("The local executor was already started.");
}
}
}
/**
* Stop the local executor instance. You should not call executePlan after this.
*/
public void stop() throws Exception {
synchronized (this.lock) {
if (this.nephele != null) {
this.nephele.stop();
this.nephele = null;
} else {
throw new IllegalStateException("The local executor was not started.");
}
}
}
/**
* Execute the given plan on the local Nephele instance, wait for the job to
* finish and return the runtime in milliseconds.
*
* @param plan The plan of the program to execute.
* @return The net runtime of the program, in milliseconds.
*
* @throws Exception Thrown, if either the startup of the local execution context, or the execution
* caused an exception.
*/
public JobExecutionResult executePlan(Plan plan) throws Exception {
if (plan == null) {
throw new IllegalArgumentException("The plan may not be null.");
}
ContextChecker checker = new ContextChecker();
checker.check(plan);
synchronized (this.lock) {
// check if we start a session dedicated for this execution
final boolean shutDownAtEnd;
if (this.nephele == null) {
// we start a session just for us now
shutDownAtEnd = true;
// configure the number of local slots equal to the parallelism of the local plan
if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
int maxParallelism = plan.getMaximumParallelism();
if (maxParallelism > 0) {
this.taskManagerNumSlots = maxParallelism;
}
}
start();
} else {
// we use the existing session
shutDownAtEnd = false;
}
try {
PactCompiler pc = new PactCompiler(new DataStatistics());
OptimizedPlan op = pc.compile(plan);
NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
JobGraph jobGraph = jgg.compileJobGraph(op);
JobClient jobClient = this.nephele.getJobClient(jobGraph);
JobExecutionResult result = jobClient.submitJobAndWait();
return result;
}
finally {
if (shutDownAtEnd) {
stop();
}
}
}
}
/**
* Returns a JSON dump of the optimized plan.
*
* @param plan
* The program's plan.
* @return JSON dump of the optimized plan.
* @throws Exception
*/
public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
PactCompiler pc = new PactCompiler(new DataStatistics());
OptimizedPlan op = pc.compile(plan);
PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
return gen.getOptimizerPlanAsJSON(op);
}
// --------------------------------------------------------------------------------------------
// Static variants that internally bring up an instance and shut it down after the execution
// --------------------------------------------------------------------------------------------
/**
* Executes the program described by the given plan assembler.
*
* @param pa The program's plan assembler.
* @param args The parameters.
* @return The net runtime of the program, in milliseconds.
*
* @throws Exception Thrown, if either the startup of the local execution context, or the execution
* caused an exception.
*/
public static JobExecutionResult execute(Program pa, String... args) throws Exception {
return execute(pa.getPlan(args));
}
/**
* Executes the program represented by the given Pact plan.
*
* @param plan The program's plan.
* @return The net runtime of the program, in milliseconds.
*
* @throws Exception Thrown, if either the startup of the local execution context, or the execution
* caused an exception.
*/
public static JobExecutionResult execute(Plan plan) throws Exception {
LocalExecutor exec = new LocalExecutor();
try {
exec.start();
return exec.executePlan(plan);
} finally {
exec.stop();
}
}
/**
* Returns a JSON dump of the optimized plan.
*
* @param plan
* The program's plan.
* @return JSON dump of the optimized plan.
* @throws Exception
*/
public static String optimizerPlanAsJSON(Plan plan) throws Exception {
LocalExecutor exec = new LocalExecutor();
try {
exec.start();
PactCompiler pc = new PactCompiler(new DataStatistics());
OptimizedPlan op = pc.compile(plan);
PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
return gen.getOptimizerPlanAsJSON(op);
} finally {
exec.stop();
}
}
/**
* Return unoptimized plan as JSON.
*
* @param plan The program plan.
* @return The plan as a JSON object.
*/
public static String getPlanAsJSON(Plan plan) {
PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
List<DataSinkNode> sinks = PactCompiler.createPreOptimizedPlan(plan);
return gen.getPactPlanAsJSON(sinks);
}
/**
* Utility method for logging
*/
public static void setLoggingLevel(Level lvl) {
LogUtils.initializeDefaultConsoleLogger(lvl);
}
/**
* By default, local environments do not overwrite existing files.
*
* NOTE: This method must be called prior to initializing the LocalExecutor or a
* {@link org.apache.flink.api.java.LocalEnvironment}.
*
* @param overwriteByDefault True to overwrite by default, false to not overwrite by default.
*/
public static void setOverwriteFilesByDefault(boolean overwriteByDefault) {
DEFAULT_OVERWRITE = overwriteByDefault;
}
}