blob: 14dc2893fde6bb5c01c478551a21bcc78eaa30b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.client;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
import org.apache.flink.runtime.yarn.FlinkYarnClusterStatus;
import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
/**
* Class handling the command line interface to the YARN session.
*/
public class FlinkYarnSessionCli {
private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnSessionCli.class);
//------------------------------------ Constants -------------------------
private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
public static final String CONFIG_FILE_LOGBACK_NAME = "logback.xml";
public static final String CONFIG_FILE_LOG4J_NAME = "log4j.properties";
private static final int CLIENT_POLLING_INTERVALL = 3;
//------------------------------------ Command Line argument options -------------------------
// the prefix transformation is used by the CliFrontend static constructor.
private final Option QUERY;
// --- or ---
private final Option QUEUE;
private final Option SHIP_PATH;
private final Option FLINK_JAR;
private final Option JM_MEMORY;
private final Option TM_MEMORY;
private final Option CONTAINER;
private final Option SLOTS;
private final Option DETACHED;
private final Option STREAMING;
private final Option NAME;
/**
* Dynamic properties allow the user to specify additional configuration values with -D, such as
* -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368
*/
private final Option DYNAMIC_PROPERTIES;
//------------------------------------ Internal fields -------------------------
private AbstractFlinkYarnCluster yarnCluster = null;
private boolean detachedMode = false;
/** Default yarn application name. */
private String defaultApplicationName = null;
public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
SHIP_PATH = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)");
FLINK_JAR = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
JM_MEMORY = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]");
TM_MEMORY = new Option(shortPrefix + "tm", longPrefix + "taskManagerMemory", true, "Memory per TaskManager Container [in MB]");
CONTAINER = new Option(shortPrefix + "n", longPrefix + "container", true, "Number of YARN container to allocate (=Number of Task Managers)");
SLOTS = new Option(shortPrefix + "s", longPrefix + "slots", true, "Number of slots per TaskManager");
DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties");
DETACHED = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached");
STREAMING = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode");
NAME = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN");
}
/**
* Creates a new Yarn Client.
* @param cmd the command line to parse options from
* @return an instance of the client or null if there was an error
*/
public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
if (flinkYarnClient == null) {
return null;
}
if (!cmd.hasOption(CONTAINER.getOpt())) { // number of containers is required option!
LOG.error("Missing required argument " + CONTAINER.getOpt());
printUsage();
return null;
}
flinkYarnClient.setTaskManagerCount(Integer.valueOf(cmd.getOptionValue(CONTAINER.getOpt())));
// Jar Path
Path localJarPath;
if (cmd.hasOption(FLINK_JAR.getOpt())) {
String userPath = cmd.getOptionValue(FLINK_JAR.getOpt());
if(!userPath.startsWith("file://")) {
userPath = "file://" + userPath;
}
localJarPath = new Path(userPath);
} else {
LOG.info("No path for the flink jar passed. Using the location of "+flinkYarnClient.getClass()+" to locate the jar");
localJarPath = new Path("file://"+flinkYarnClient.getClass().getProtectionDomain().getCodeSource().getLocation().getPath());
}
flinkYarnClient.setLocalJarPath(localJarPath);
// Conf Path
String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
GlobalConfiguration.loadConfiguration(confDirPath);
Configuration flinkConfiguration = GlobalConfiguration.getConfiguration();
flinkYarnClient.setFlinkConfigurationObject(flinkConfiguration);
flinkYarnClient.setConfigurationDirectory(confDirPath);
File confFile = new File(confDirPath + File.separator + CONFIG_FILE_NAME);
if (!confFile.exists()) {
LOG.error("Unable to locate configuration file in "+confFile);
return null;
}
Path confPath = new Path(confFile.getAbsolutePath());
flinkYarnClient.setConfigurationFilePath(confPath);
List<File> shipFiles = new ArrayList<>();
// path to directory to ship
if (cmd.hasOption(SHIP_PATH.getOpt())) {
String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt());
File shipDir = new File(shipPath);
if (shipDir.isDirectory()) {
shipFiles = new ArrayList<>(Arrays.asList(shipDir.listFiles(new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return !(name.equals(".") || name.equals(".."));
}
})));
} else {
LOG.warn("Ship directory is not a directory. Ignoring it.");
}
}
//check if there is a logback or log4j file
if (confDirPath.length() > 0) {
File logback = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOGBACK_NAME);
if (logback.exists()) {
shipFiles.add(logback);
flinkYarnClient.setFlinkLoggingConfigurationPath(new Path(logback.toURI()));
}
File log4j = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOG4J_NAME);
if (log4j.exists()) {
shipFiles.add(log4j);
if (flinkYarnClient.getFlinkLoggingConfigurationPath() != null) {
// this means there is already a logback configuration file --> fail
LOG.warn("The configuration directory ('" + confDirPath + "') contains both LOG4J and " +
"Logback configuration files. Please delete or rename one of them.");
} // else
flinkYarnClient.setFlinkLoggingConfigurationPath(new Path(log4j.toURI()));
}
}
flinkYarnClient.setShipFiles(shipFiles);
// queue
if (cmd.hasOption(QUEUE.getOpt())) {
flinkYarnClient.setQueue(cmd.getOptionValue(QUEUE.getOpt()));
}
// JobManager Memory
if (cmd.hasOption(JM_MEMORY.getOpt())) {
int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
flinkYarnClient.setJobManagerMemory(jmMemory);
}
// Task Managers memory
if (cmd.hasOption(TM_MEMORY.getOpt())) {
int tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt()));
flinkYarnClient.setTaskManagerMemory(tmMemory);
}
if (cmd.hasOption(SLOTS.getOpt())) {
int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt()));
flinkYarnClient.setTaskManagerSlots(slots);
}
String[] dynamicProperties = null;
if (cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) {
dynamicProperties = cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt());
}
String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties,
CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
flinkYarnClient.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
if (cmd.hasOption(DETACHED.getOpt())) {
this.detachedMode = true;
flinkYarnClient.setDetachedMode(detachedMode);
}
if(cmd.hasOption(NAME.getOpt())) {
flinkYarnClient.setName(cmd.getOptionValue(NAME.getOpt()));
} else {
// set the default application name, if none is specified
if(defaultApplicationName != null) {
flinkYarnClient.setName(defaultApplicationName);
}
}
return flinkYarnClient;
}
private void printUsage() {
System.out.println("Usage:");
HelpFormatter formatter = new HelpFormatter();
formatter.setWidth(200);
formatter.setLeftPadding(5);
formatter.setSyntaxPrefix(" Required");
Options req = new Options();
req.addOption(CONTAINER);
formatter.printHelp(" ", req);
formatter.setSyntaxPrefix(" Optional");
Options opt = new Options();
opt.addOption(JM_MEMORY);
opt.addOption(TM_MEMORY);
opt.addOption(QUERY);
opt.addOption(QUEUE);
opt.addOption(SLOTS);
opt.addOption(DYNAMIC_PROPERTIES);
opt.addOption(DETACHED);
opt.addOption(STREAMING);
opt.addOption(NAME);
formatter.printHelp(" ", opt);
}
public static AbstractFlinkYarnClient getFlinkYarnClient() {
AbstractFlinkYarnClient yarnClient;
try {
Class<? extends AbstractFlinkYarnClient> yarnClientClass =
Class.forName("org.apache.flink.yarn.FlinkYarnClient").asSubclass(AbstractFlinkYarnClient.class);
yarnClient = InstantiationUtil.instantiate(yarnClientClass, AbstractFlinkYarnClient.class);
}
catch (ClassNotFoundException e) {
System.err.println("Unable to locate the Flink YARN Client. " +
"Please ensure that you are using a Flink build with Hadoop2/YARN support. Message: " +
e.getMessage());
e.printStackTrace(System.err);
return null; // make it obvious
}
return yarnClient;
}
private static void writeYarnProperties(Properties properties, File propertiesFile) {
try {
OutputStream out = new FileOutputStream(propertiesFile);
properties.store(out, "Generated YARN properties file");
out.close();
} catch (IOException e) {
throw new RuntimeException("Error writing the properties file", e);
}
propertiesFile.setReadable(true, false); // readable for all.
}
public static void runInteractiveCli(AbstractFlinkYarnCluster yarnCluster) {
final String HELP = "Available commands:\n" +
"help - show these commands\n" +
"stop - stop the YARN session";
int numTaskmanagers = 0;
try {
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
label:
while (true) {
// ------------------ check if there are updates by the cluster -----------
FlinkYarnClusterStatus status = yarnCluster.getClusterStatus();
if (status != null && numTaskmanagers != status.getNumberOfTaskManagers()) {
System.err.println("Number of connected TaskManagers changed to " +
status.getNumberOfTaskManagers() + ". Slots available: " + status.getNumberOfSlots());
numTaskmanagers = status.getNumberOfTaskManagers();
}
List<String> messages = yarnCluster.getNewMessages();
if (messages != null && messages.size() > 0) {
System.err.println("New messages from the YARN cluster: ");
for (String msg : messages) {
System.err.println(msg);
}
}
if (yarnCluster.hasFailed()) {
System.err.println("The YARN cluster has failed");
yarnCluster.shutdown(true);
}
// wait until CLIENT_POLLING_INTERVALL is over or the user entered something.
long startTime = System.currentTimeMillis();
while ((System.currentTimeMillis() - startTime) < CLIENT_POLLING_INTERVALL * 1000
&& !in.ready()) {
Thread.sleep(200);
}
//------------- handle interactive command by user. ----------------------
if (in.ready()) {
String command = in.readLine();
switch (command) {
case "quit":
case "stop":
break label;
case "help":
System.err.println(HELP);
break;
default:
System.err.println("Unknown command '" + command + "'. Showing help: \n" + HELP);
break;
}
}
if (yarnCluster.hasBeenStopped()) {
LOG.info("Stopping interactive command line interface, YARN cluster has been stopped.");
break;
}
}
} catch(Exception e) {
LOG.warn("Exception while running the interactive command line interface", e);
}
}
public static void main(String[] args) {
FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", ""); // no prefix for the YARN session
System.exit(cli.run(args));
}
public void getYARNSessionCLIOptions(Options options) {
options.addOption(FLINK_JAR);
options.addOption(JM_MEMORY);
options.addOption(TM_MEMORY);
options.addOption(CONTAINER);
options.addOption(QUEUE);
options.addOption(QUERY);
options.addOption(SHIP_PATH);
options.addOption(SLOTS);
options.addOption(DYNAMIC_PROPERTIES);
options.addOption(DETACHED);
options.addOption(STREAMING);
options.addOption(NAME);
}
public int run(String[] args) {
//
// Command Line Options
//
Options options = new Options();
getYARNSessionCLIOptions(options);
CommandLineParser parser = new PosixParser();
CommandLine cmd;
try {
cmd = parser.parse(options, args);
} catch(Exception e) {
System.out.println(e.getMessage());
printUsage();
return 1;
}
// Query cluster for metrics
if (cmd.hasOption(QUERY.getOpt())) {
AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
String description;
try {
description = flinkYarnClient.getClusterDescription();
} catch (Exception e) {
System.err.println("Error while querying the YARN cluster for available resources: "+e.getMessage());
e.printStackTrace(System.err);
return 1;
}
System.out.println(description);
return 0;
} else {
AbstractFlinkYarnClient flinkYarnClient = createFlinkYarnClient(cmd);
if (flinkYarnClient == null) {
System.err.println("Error while starting the YARN Client. Please check log output!");
return 1;
}
try {
yarnCluster = flinkYarnClient.deploy();
// only connect to cluster if its not a detached session.
if(!flinkYarnClient.isDetached()) {
yarnCluster.connectToCluster();
}
} catch (Exception e) {
System.err.println("Error while deploying YARN cluster: "+e.getMessage());
e.printStackTrace(System.err);
return 1;
}
//------------------ Cluster deployed, handle connection details
String jobManagerAddress = yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + ":" + yarnCluster.getJobManagerAddress().getPort();
System.out.println("Flink JobManager is now running on " + jobManagerAddress);
System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL());
// file that we write into the conf/ dir containing the jobManager address and the dop.
String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir");
String currentUser = System.getProperty("user.name");
String propertiesFileLocation = yarnCluster.getFlinkConfiguration().getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation);
File yarnPropertiesFile = new File(propertiesFileLocation + File.separator + CliFrontend.YARN_PROPERTIES_FILE + currentUser);
Properties yarnProps = new Properties();
yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress);
if (flinkYarnClient.getTaskManagerSlots() != -1) {
String parallelism =
Integer.toString(flinkYarnClient.getTaskManagerSlots() * flinkYarnClient.getTaskManagerCount());
yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_PARALLELISM, parallelism);
}
// add dynamic properties
if (flinkYarnClient.getDynamicPropertiesEncoded() != null) {
yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING,
flinkYarnClient.getDynamicPropertiesEncoded());
}
writeYarnProperties(yarnProps, yarnPropertiesFile);
//------------------ Cluster running, let user control it ------------
if (detachedMode) {
// print info and quit:
LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill "+yarnCluster.getApplicationId()+"\n" +
"Please also note that the temporary files of the YARN session in {} will not be removed.",
flinkYarnClient.getSessionFilesDir());
} else {
runInteractiveCli(yarnCluster);
if (!yarnCluster.hasBeenStopped()) {
LOG.info("Command Line Interface requested session shutdown");
yarnCluster.shutdown(false);
}
try {
yarnPropertiesFile.delete();
} catch (Exception e) {
LOG.warn("Exception while deleting the JobManager address file", e);
}
}
}
return 0;
}
/**
* Sets the default Yarn Application Name.
* @param defaultApplicationName the name of the yarn application to use
* @return FlinkYarnSessionCli instance, for chaining
*/
public FlinkYarnSessionCli withDefaultApplicationName(String defaultApplicationName) {
this.defaultApplicationName = defaultApplicationName;
return this;
}
/**
* Utility method for tests.
*/
public void stop() {
if (yarnCluster != null) {
LOG.info("Command line interface is shutting down the yarnCluster");
yarnCluster.shutdown(false);
}
}
}