blob: 787f20b4ebc6305dc1729fb1130a9958a7c091c0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram.cli;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.lang.management.ManagementFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.security.PrivilegedExceptionAction;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import javax.validation.constraints.NotNull;
import javax.ws.rs.core.MediaType;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.log4j.Appender;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.tools.ant.DirectoryScanner;
import com.google.common.base.Preconditions;
import com.sun.jersey.api.client.WebResource;
import com.datatorrent.api.DAG.GenericOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.stram.StramUtils;
import com.datatorrent.stram.client.AppPackage;
import com.datatorrent.stram.client.AppPackage.AppInfo;
import com.datatorrent.stram.client.AppPackage.PropertyInfo;
import com.datatorrent.stram.client.ConfigPackage;
import com.datatorrent.stram.client.DTConfiguration;
import com.datatorrent.stram.client.DTConfiguration.Scope;
import com.datatorrent.stram.client.RecordingsAgent;
import com.datatorrent.stram.client.RecordingsAgent.RecordingInfo;
import com.datatorrent.stram.client.StramAgent;
import com.datatorrent.stram.client.StramAppLauncher;
import com.datatorrent.stram.client.StramAppLauncher.AppFactory;
import com.datatorrent.stram.client.StramClientUtils;
import com.datatorrent.stram.client.StramClientUtils.ClientRMHelper;
import com.datatorrent.stram.codec.LogicalPlanSerializer;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
import com.datatorrent.stram.plan.logical.requests.AddStreamSinkRequest;
import com.datatorrent.stram.plan.logical.requests.CreateOperatorRequest;
import com.datatorrent.stram.plan.logical.requests.CreateStreamRequest;
import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
import com.datatorrent.stram.plan.logical.requests.RemoveOperatorRequest;
import com.datatorrent.stram.plan.logical.requests.RemoveStreamRequest;
import com.datatorrent.stram.plan.logical.requests.SetOperatorAttributeRequest;
import com.datatorrent.stram.plan.logical.requests.SetOperatorPropertyRequest;
import com.datatorrent.stram.plan.logical.requests.SetPortAttributeRequest;
import com.datatorrent.stram.plan.logical.requests.SetStreamAttributeRequest;
import com.datatorrent.stram.security.StramUserLogin;
import com.datatorrent.stram.util.JSONSerializationProvider;
import com.datatorrent.stram.util.LoggerUtil;
import com.datatorrent.stram.util.SecurityUtils;
import com.datatorrent.stram.util.VersionInfo;
import com.datatorrent.stram.util.WebServicesClient;
import com.datatorrent.stram.webapp.OperatorDiscoverer;
import com.datatorrent.stram.webapp.StramWebServices;
import com.datatorrent.stram.webapp.TypeDiscoverer;
import jline.console.ConsoleReader;
import jline.console.completer.AggregateCompleter;
import jline.console.completer.ArgumentCompleter;
import jline.console.completer.Completer;
import jline.console.completer.FileNameCompleter;
import jline.console.completer.StringsCompleter;
import jline.console.history.FileHistory;
import jline.console.history.History;
import jline.console.history.MemoryHistory;
import sun.misc.Signal;
import sun.misc.SignalHandler;
/**
* Provides command line interface for a streaming application on hadoop (yarn)
* <p>
*
* @since 0.3.2
*/
@SuppressWarnings("UseOfSystemOutOrSystemErr")
public class ApexCli
{
private static final Logger LOG = LoggerFactory.getLogger(ApexCli.class);
private static String CONFIG_EXCLUSIVE = "exclusive";
private static String CONFIG_INCLUSIVE = "inclusive";
private static final String COLOR_RED = "\033[38;5;196m";
private static final String COLOR_YELLOW = "\033[0;93m";
private static final String FORMAT_BOLD = "\033[1m";
private static final String COLOR_RESET = "\033[0m";
private static final String ITALICS = "\033[3m";
private static final String APEX_HIGHLIGHT_COLOR_PROPERTY_NAME = "apex.cli.color.highlight";
private static final String APEX_HIGHLIGHT_COLOR_ENV_VAR_NAME = "APEX_HIGHLIGHT_COLOR";
protected Configuration conf;
private FileSystem fs;
private StramAgent stramAgent;
private final YarnClient yarnClient = YarnClient.createYarnClient();
private ApplicationReport currentApp = null;
private boolean consolePresent;
private String[] commandsToExecute;
private final Map<String, CommandSpec> globalCommands = new TreeMap<>();
private final Map<String, CommandSpec> connectedCommands = new TreeMap<>();
private final Map<String, CommandSpec> logicalPlanChangeCommands = new TreeMap<>();
private final Map<String, String> aliases = new HashMap<>();
private final Map<String, List<String>> macros = new HashMap<>();
private boolean changingLogicalPlan = false;
private final List<LogicalPlanRequest> logicalPlanRequestQueue = new ArrayList<>();
private FileHistory topLevelHistory;
private FileHistory changingLogicalPlanHistory;
private String jsonp;
private boolean raw = false;
private RecordingsAgent recordingsAgent;
private final ObjectMapper mapper = new JSONSerializationProvider().getContext(null);
private String pagerCommand;
private Process pagerProcess;
private int verboseLevel = 0;
private final Tokenizer tokenizer = new Tokenizer();
private final Map<String, String> variableMap = new HashMap<>();
private static boolean lastCommandError = false;
private Thread mainThread;
private Thread commandThread;
private String prompt;
private String forcePrompt;
private String kerberosPrincipal;
private String kerberosKeyTab;
private String highlightColor = null;
private static class FileLineReader extends ConsoleReader
{
private final BufferedReader br;
FileLineReader(String fileName) throws IOException
{
super();
fileName = expandFileName(fileName, true);
br = new BufferedReader(new FileReader(fileName));
}
@Override
public String readLine(String prompt) throws IOException
{
return br.readLine();
}
@Override
public String readLine(String prompt, Character mask) throws IOException
{
return br.readLine();
}
@Override
public String readLine(Character mask) throws IOException
{
return br.readLine();
}
public void close() throws IOException
{
br.close();
}
}
public class Tokenizer
{
private void appendToCommandBuffer(List<String> commandBuffer, StringBuffer buf, boolean potentialEmptyArg)
{
if (potentialEmptyArg || buf.length() > 0) {
commandBuffer.add(buf.toString());
buf.setLength(0);
}
}
private List<String> startNewCommand(LinkedList<List<String>> resultBuffer)
{
List<String> newCommand = new ArrayList<>();
if (!resultBuffer.isEmpty()) {
List<String> lastCommand = resultBuffer.peekLast();
if (lastCommand.size() == 1) {
String first = lastCommand.get(0);
if (first.matches("^[A-Za-z][A-Za-z0-9]*=.*")) {
// This is a variable assignment
int equalSign = first.indexOf('=');
variableMap.put(first.substring(0, equalSign), first.substring(equalSign + 1));
resultBuffer.removeLast();
}
}
}
resultBuffer.add(newCommand);
return newCommand;
}
public List<String[]> tokenize(String commandLine)
{
LinkedList<List<String>> resultBuffer = new LinkedList<>();
List<String> commandBuffer = startNewCommand(resultBuffer);
if (commandLine != null) {
commandLine = ltrim(commandLine);
if (commandLine.startsWith("#")) {
return null;
}
int len = commandLine.length();
boolean insideQuotes = false;
boolean potentialEmptyArg = false;
StringBuffer buf = new StringBuffer(commandLine.length());
for (@SuppressWarnings("AssignmentToForLoopParameter") int i = 0; i < len; ++i) {
char c = commandLine.charAt(i);
if (c == '"') {
potentialEmptyArg = true;
insideQuotes = !insideQuotes;
} else if (c == '\\') {
if (len > i + 1) {
switch (commandLine.charAt(i + 1)) {
case 'n':
buf.append("\n");
break;
case 't':
buf.append("\t");
break;
case 'r':
buf.append("\r");
break;
case 'b':
buf.append("\b");
break;
case 'f':
buf.append("\f");
break;
default:
buf.append(commandLine.charAt(i + 1));
}
++i;
}
} else {
if (insideQuotes) {
buf.append(c);
} else {
if (c == '$') {
StringBuilder variableName = new StringBuilder(32);
if (len > i + 1) {
if (commandLine.charAt(i + 1) == '{') {
++i;
while (len > i + 1) {
char ch = commandLine.charAt(i + 1);
if (ch != '}') {
variableName.append(ch);
}
++i;
if (ch == '}') {
break;
}
if (len <= i + 1) {
throw new CliException("Parse error: unmatched brace");
}
}
} else if (commandLine.charAt(i + 1) == '?') {
++i;
buf.append(lastCommandError ? "1" : "0");
continue;
} else {
while (len > i + 1) {
char ch = commandLine.charAt(i + 1);
if ((variableName.length() > 0 && ch >= '0' && ch <= '9') || ((ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z'))) {
variableName.append(ch);
} else {
break;
}
++i;
}
}
if (variableName.length() == 0) {
buf.append(c);
} else {
String value = variableMap.get(variableName.toString());
if (value != null) {
buf.append(value);
}
}
} else {
buf.append(c);
}
} else if (c == ';') {
appendToCommandBuffer(commandBuffer, buf, potentialEmptyArg);
commandBuffer = startNewCommand(resultBuffer);
} else if (Character.isWhitespace(c)) {
appendToCommandBuffer(commandBuffer, buf, potentialEmptyArg);
potentialEmptyArg = false;
if (len > i + 1 && commandLine.charAt(i + 1) == '#') {
break;
}
} else {
buf.append(c);
}
}
}
}
appendToCommandBuffer(commandBuffer, buf, potentialEmptyArg);
}
startNewCommand(resultBuffer);
List<String[]> result = new ArrayList<>();
for (List<String> command : resultBuffer) {
String[] commandArray = new String[command.size()];
result.add(command.toArray(commandArray));
}
return result;
}
}
private interface Command
{
void execute(String[] args, ConsoleReader reader) throws Exception;
}
private static class Arg
{
final String name;
Arg(String name)
{
this.name = name;
}
@Override
public String toString()
{
return name;
}
}
private static class FileArg extends Arg
{
FileArg(String name)
{
super(name);
}
}
// VarArg must be in optional argument and must be at the end
private static class VarArg extends Arg
{
VarArg(String name)
{
super(name);
}
}
private static class CommandArg extends Arg
{
CommandArg(String name)
{
super(name);
}
}
protected PrintStream suppressOutput()
{
PrintStream originalStream = System.out;
if (raw) {
PrintStream dummyStream = new PrintStream(new OutputStream()
{
@Override
public void write(int b)
{
// no-op
}
});
System.setOut(dummyStream);
}
return originalStream;
}
protected void restoreOutput(PrintStream originalStream)
{
if (raw) {
System.setOut(originalStream);
}
}
AppPackage newAppPackageInstance(URI uri, boolean suppressOutput) throws IOException
{
PrintStream outputStream = suppressOutput ? suppressOutput() : null;
try {
final String scheme = uri.getScheme();
if (scheme == null || scheme.equals("file")) {
return new AppPackage(new FileInputStream(new File(expandFileName(uri.getPath(), true))), true);
} else {
try (FileSystem fs = FileSystem.newInstance(uri, conf)) {
return new AppPackage(fs.open(new Path(uri.getPath())), true);
}
}
} finally {
if (outputStream != null) {
restoreOutput(outputStream);
}
}
}
AppPackage newAppPackageInstance(File f) throws IOException
{
PrintStream outputStream = suppressOutput();
try {
return new AppPackage(f, true);
} finally {
restoreOutput(outputStream);
}
}
@SuppressWarnings("unused")
private StramAppLauncher getStramAppLauncher(String jarfileUri, Configuration config, boolean ignorePom) throws Exception
{
URI uri = new URI(jarfileUri);
String scheme = uri.getScheme();
StramAppLauncher appLauncher = null;
if (scheme == null || scheme.equals("file")) {
File jf = new File(uri.getPath());
appLauncher = new StramAppLauncher(jf, config);
} else {
try (FileSystem tmpFs = FileSystem.newInstance(uri, conf)) {
Path path = new Path(uri.getPath());
appLauncher = new StramAppLauncher(tmpFs, path, config);
}
}
if (appLauncher != null) {
if (verboseLevel > 0) {
System.err.print(appLauncher.getMvnBuildClasspathOutput());
}
return appLauncher;
} else {
throw new CliException("Scheme " + scheme + " not supported.");
}
}
private static class CommandSpec
{
Command command;
Arg[] requiredArgs;
Arg[] optionalArgs;
String description;
CommandSpec(Command command, Arg[] requiredArgs, Arg[] optionalArgs, String description)
{
this.command = command;
this.requiredArgs = requiredArgs;
this.optionalArgs = optionalArgs;
this.description = description;
}
void verifyArguments(String[] args) throws CliException
{
int minArgs = 0;
int maxArgs = 0;
if (requiredArgs != null) {
minArgs = requiredArgs.length;
maxArgs = requiredArgs.length;
}
if (optionalArgs != null) {
for (Arg arg : optionalArgs) {
if (arg instanceof VarArg) {
maxArgs = Integer.MAX_VALUE;
break;
} else {
maxArgs++;
}
}
}
if (args.length - 1 < minArgs || args.length - 1 > maxArgs) {
throw new CliException("Command parameter error");
}
}
void printUsage(String cmd)
{
System.err.print("Usage: " + cmd);
if (requiredArgs != null) {
for (Arg arg : requiredArgs) {
System.err.print(" <" + arg + ">");
}
}
if (optionalArgs != null) {
for (Arg arg : optionalArgs) {
if (arg instanceof VarArg) {
System.err.print(" [<" + arg + "> ... ]");
} else {
System.err.print(" [<" + arg + ">]");
}
}
}
System.err.println();
}
}
private static class OptionsCommandSpec extends CommandSpec
{
Options options;
OptionsCommandSpec(Command command, Arg[] requiredArgs, Arg[] optionalArgs, String description, Options options)
{
super(command, requiredArgs, optionalArgs, description);
this.options = options;
}
@Override
void verifyArguments(String[] args) throws CliException
{
try {
args = new PosixParser().parse(options, args).getArgs();
super.verifyArguments(args);
} catch (Exception ex) {
throw new CliException("Command parameter error");
}
}
@Override
void printUsage(String cmd)
{
super.printUsage(cmd + ((options == null) ? "" : " [options]"));
if (options != null) {
System.out.println("Options:");
HelpFormatter formatter = new HelpFormatter();
PrintWriter pw = new PrintWriter(System.out);
formatter.printOptions(pw, 80, options, 4, 4);
pw.flush();
}
}
}
ApexCli()
{
//
// Global command specification starts here
//
globalCommands.put("help", new CommandSpec(new HelpCommand(),
null,
new Arg[]{new CommandArg("command")},
"Show help"));
globalCommands.put("echo", new CommandSpec(new EchoCommand(),
null, new Arg[]{new VarArg("arg")},
"Echo the arguments"));
globalCommands.put("connect", new CommandSpec(new ConnectCommand(),
new Arg[]{new Arg("app-id")},
null,
"Connect to an app"));
globalCommands.put("launch", new OptionsCommandSpec(new LaunchCommand(),
new Arg[]{},
new Arg[]{new FileArg("jar-file/json-file/properties-file/app-package-file-path/app-package-file-uri"), new Arg("matching-app-name")},
"Launch an app", LAUNCH_OPTIONS.options));
globalCommands.put("shutdown-app", new CommandSpec(new ShutdownAppCommand(),
new Arg[]{new Arg("app-id")},
new Arg[]{new VarArg("app-id")},
"Shutdown an app"));
globalCommands.put("list-apps", new CommandSpec(new ListAppsCommand(),
null,
new Arg[]{new Arg("pattern")},
"List applications"));
globalCommands.put("kill-app", new CommandSpec(new KillAppCommand(),
new Arg[]{new Arg("app-id/app-name")},
new Arg[]{new VarArg("app-id/app-name")},
"Kill an app"));
globalCommands.put("show-logical-plan", new OptionsCommandSpec(new ShowLogicalPlanCommand(),
new Arg[]{new FileArg("jar-file/app-package-file")},
new Arg[]{new Arg("class-name")},
"List apps in a jar or show logical plan of an app class",
getShowLogicalPlanCommandLineOptions()));
globalCommands.put("get-jar-operator-classes", new OptionsCommandSpec(new GetJarOperatorClassesCommand(),
new Arg[]{new FileArg("jar-files-comma-separated")},
new Arg[]{new Arg("search-term")},
"List operators in a jar list",
GET_OPERATOR_CLASSES_OPTIONS.options));
globalCommands.put("get-jar-operator-properties", new CommandSpec(new GetJarOperatorPropertiesCommand(),
new Arg[]{new FileArg("jar-files-comma-separated"), new Arg("operator-class-name")},
null,
"List properties in specified operator"));
globalCommands.put("alias", new CommandSpec(new AliasCommand(),
new Arg[]{new Arg("alias-name"), new CommandArg("command")},
null,
"Create a command alias"));
globalCommands.put("source", new CommandSpec(new SourceCommand(),
new Arg[]{new FileArg("file")},
null,
"Execute the commands in a file"));
globalCommands.put("exit", new CommandSpec(new ExitCommand(),
null,
null,
"Exit the CLI"));
globalCommands.put("begin-macro", new CommandSpec(new BeginMacroCommand(),
new Arg[]{new Arg("name")},
null,
"Begin Macro Definition ($1...$9 to access parameters and type 'end' to end the definition)"));
globalCommands.put("dump-properties-file", new CommandSpec(new DumpPropertiesFileCommand(),
new Arg[]{new FileArg("out-file"), new FileArg("jar-file"), new Arg("app-name")},
null,
"Dump the properties file of an app class"));
globalCommands.put("get-app-info", new CommandSpec(new GetAppInfoCommand(),
new Arg[]{new Arg("app-id")},
null,
"Get the information of an app"));
globalCommands.put("set-pager", new CommandSpec(new SetPagerCommand(),
new Arg[]{new Arg("on/off")},
null,
"Set the pager program for output"));
globalCommands.put("get-config-parameter", new CommandSpec(new GetConfigParameterCommand(),
null,
new Arg[]{new FileArg("parameter-name")},
"Get the configuration parameter"));
globalCommands.put("get-app-package-info", new OptionsCommandSpec(new GetAppPackageInfoCommand(),
new Arg[]{new FileArg("app-package-file-path/app-package-file-uri")},
new Arg[]{new Arg("-withDescription")},
"Get info on the app package file",
GET_APP_PACKAGE_INFO_OPTIONS));
globalCommands.put("get-app-package-operators", new OptionsCommandSpec(new GetAppPackageOperatorsCommand(),
new Arg[]{new FileArg("app-package-file-path/app-package-file-uri")},
new Arg[]{new Arg("search-term")},
"Get operators within the given app package",
GET_OPERATOR_CLASSES_OPTIONS.options));
globalCommands.put("get-app-package-operator-properties", new CommandSpec(new GetAppPackageOperatorPropertiesCommand(),
new Arg[]{new FileArg("app-package-file-path/app-package-file-uri"), new Arg("operator-class")},
null,
"Get operator properties within the given app package"));
globalCommands.put("list-default-app-attributes", new CommandSpec(new ListDefaultAttributesCommand(AttributesType.APPLICATION),
null, null, "Lists the default application attributes"));
globalCommands.put("list-default-operator-attributes", new CommandSpec(new ListDefaultAttributesCommand(AttributesType.OPERATOR),
null, null, "Lists the default operator attributes"));
globalCommands.put("list-default-port-attributes", new CommandSpec(new ListDefaultAttributesCommand(AttributesType.PORT),
null, null, "Lists the default port attributes"));
globalCommands.put("clean-app-directories", new CommandSpec(new CleanAppDirectoriesCommand(),
new Arg[]{new Arg("duration-in-millis")},
null,
"Clean up data directories of applications that terminated the given milliseconds ago"));
//
// Connected command specification starts here
//
connectedCommands.put("list-containers", new CommandSpec(new ListContainersCommand(),
null,
null,
"List containers"));
connectedCommands.put("list-operators", new CommandSpec(new ListOperatorsCommand(),
null,
new Arg[]{new Arg("pattern")},
"List operators"));
connectedCommands.put("show-physical-plan", new CommandSpec(new ShowPhysicalPlanCommand(),
null,
null,
"Show physical plan"));
connectedCommands.put("kill-container", new CommandSpec(new KillContainerCommand(),
new Arg[]{new Arg("container-id")},
new Arg[]{new VarArg("container-id")},
"Kill a container"));
connectedCommands.put("shutdown-app", new CommandSpec(new ShutdownAppCommand(),
null,
new Arg[]{new VarArg("app-id")},
"Shutdown an app"));
connectedCommands.put("kill-app", new CommandSpec(new KillAppCommand(),
null,
new Arg[]{new VarArg("app-id/app-name")},
"Kill an app"));
connectedCommands.put("wait", new CommandSpec(new WaitCommand(),
new Arg[]{new Arg("timeout")},
null,
"Wait for completion of current application"));
connectedCommands.put("start-recording", new CommandSpec(new StartRecordingCommand(),
new Arg[]{new Arg("operator-id")},
new Arg[]{new Arg("port-name"), new Arg("num-windows")},
"Start recording"));
connectedCommands.put("stop-recording", new CommandSpec(new StopRecordingCommand(),
new Arg[]{new Arg("operator-id")},
new Arg[]{new Arg("port-name")},
"Stop recording"));
connectedCommands.put("get-operator-attributes", new CommandSpec(new GetOperatorAttributesCommand(),
new Arg[]{new Arg("operator-name")},
new Arg[]{new Arg("attribute-name")},
"Get attributes of an operator"));
connectedCommands.put("get-operator-properties", new CommandSpec(new GetOperatorPropertiesCommand(),
new Arg[]{new Arg("operator-name")},
new Arg[]{new Arg("property-name")},
"Get properties of a logical operator"));
connectedCommands.put("get-physical-operator-properties", new OptionsCommandSpec(new GetPhysicalOperatorPropertiesCommand(),
new Arg[]{new Arg("operator-id")},
null,
"Get properties of a physical operator", GET_PHYSICAL_PROPERTY_OPTIONS.options));
connectedCommands.put("set-operator-property", new CommandSpec(new SetOperatorPropertyCommand(),
new Arg[]{new Arg("operator-name"), new Arg("property-name"), new Arg("property-value")},
null,
"Set a property of an operator"));
connectedCommands.put("set-physical-operator-property", new CommandSpec(new SetPhysicalOperatorPropertyCommand(),
new Arg[]{new Arg("operator-id"), new Arg("property-name"), new Arg("property-value")},
null,
"Set a property of an operator"));
connectedCommands.put("get-app-attributes", new CommandSpec(new GetAppAttributesCommand(),
null,
new Arg[]{new Arg("attribute-name")},
"Get attributes of the connected app"));
connectedCommands.put("get-port-attributes", new CommandSpec(new GetPortAttributesCommand(),
new Arg[]{new Arg("operator-name"), new Arg("port-name")},
new Arg[]{new Arg("attribute-name")},
"Get attributes of a port"));
connectedCommands.put("begin-logical-plan-change", new CommandSpec(new BeginLogicalPlanChangeCommand(),
null,
null,
"Begin Logical Plan Change"));
connectedCommands.put("show-logical-plan", new OptionsCommandSpec(new ShowLogicalPlanCommand(),
null,
new Arg[]{new FileArg("jar-file/app-package-file-path/app-package-file-uri"), new Arg("class-name")},
"Show logical plan of an app class",
getShowLogicalPlanCommandLineOptions()));
connectedCommands.put("dump-properties-file", new CommandSpec(new DumpPropertiesFileCommand(),
new Arg[]{new FileArg("out-file")},
new Arg[]{new FileArg("jar-file"), new Arg("class-name")},
"Dump the properties file of an app class"));
connectedCommands.put("get-app-info", new CommandSpec(new GetAppInfoCommand(),
null,
new Arg[]{new Arg("app-id")},
"Get the information of an app"));
connectedCommands.put("get-recording-info", new CommandSpec(new GetRecordingInfoCommand(),
null,
new Arg[]{new Arg("operator-id"), new Arg("start-time")},
"Get tuple recording info"));
connectedCommands.put("get-container-stacktrace", new CommandSpec(new GetContainerStackTrace(),
null,
new Arg[]{new Arg("container-id")},
"Get the stack trace for the container"));
//
// Logical plan change command specification starts here
//
logicalPlanChangeCommands.put("help", new CommandSpec(new HelpCommand(),
null,
new Arg[]{new Arg("command")},
"Show help"));
logicalPlanChangeCommands.put("create-operator", new CommandSpec(new CreateOperatorCommand(),
new Arg[]{new Arg("operator-name"), new Arg("class-name")},
null,
"Create an operator"));
logicalPlanChangeCommands.put("create-stream", new CommandSpec(new CreateStreamCommand(),
new Arg[]{new Arg("stream-name"), new Arg("from-operator-name"), new Arg("from-port-name"), new Arg("to-operator-name"), new Arg("to-port-name")},
null,
"Create a stream"));
logicalPlanChangeCommands.put("add-stream-sink", new CommandSpec(new AddStreamSinkCommand(),
new Arg[]{new Arg("stream-name"), new Arg("to-operator-name"), new Arg("to-port-name")},
null,
"Add a sink to an existing stream"));
logicalPlanChangeCommands.put("remove-operator", new CommandSpec(new RemoveOperatorCommand(),
new Arg[]{new Arg("operator-name")},
null,
"Remove an operator"));
logicalPlanChangeCommands.put("remove-stream", new CommandSpec(new RemoveStreamCommand(),
new Arg[]{new Arg("stream-name")},
null,
"Remove a stream"));
logicalPlanChangeCommands.put("set-operator-property", new CommandSpec(new SetOperatorPropertyCommand(),
new Arg[]{new Arg("operator-name"), new Arg("property-name"), new Arg("property-value")},
null,
"Set a property of an operator"));
logicalPlanChangeCommands.put("set-operator-attribute", new CommandSpec(new SetOperatorAttributeCommand(),
new Arg[]{new Arg("operator-name"), new Arg("attr-name"), new Arg("attr-value")},
null,
"Set an attribute of an operator"));
logicalPlanChangeCommands.put("set-port-attribute", new CommandSpec(new SetPortAttributeCommand(),
new Arg[]{new Arg("operator-name"), new Arg("port-name"), new Arg("attr-name"), new Arg("attr-value")},
null,
"Set an attribute of a port"));
logicalPlanChangeCommands.put("set-stream-attribute", new CommandSpec(new SetStreamAttributeCommand(),
new Arg[]{new Arg("stream-name"), new Arg("attr-name"), new Arg("attr-value")},
null,
"Set an attribute of a stream"));
logicalPlanChangeCommands.put("show-queue", new CommandSpec(new ShowQueueCommand(),
null,
null,
"Show the queue of the plan change"));
logicalPlanChangeCommands.put("submit", new CommandSpec(new SubmitCommand(),
null,
null,
"Submit the plan change"));
logicalPlanChangeCommands.put("abort", new CommandSpec(new AbortCommand(),
null,
null,
"Abort the plan change"));
}
private void printJson(String json) throws IOException
{
PrintStream os = getOutputPrintStream();
if (jsonp != null) {
os.println(jsonp + "(" + json + ");");
} else {
os.println(json);
}
os.flush();
closeOutputPrintStream(os);
}
private void printJson(JSONObject json) throws JSONException, IOException
{
printJson(raw ? json.toString() : json.toString(2));
}
private void printJson(JSONArray jsonArray, String name) throws JSONException, IOException
{
JSONObject json = new JSONObject();
json.put(name, jsonArray);
printJson(json);
}
private <K, V> void printJson(Map<K, V> map) throws IOException, JSONException
{
printJson(new JSONObject(mapper.writeValueAsString(map)));
}
private <T> void printJson(List<T> list, String name) throws IOException, JSONException
{
printJson(new JSONArray(mapper.writeValueAsString(list)), name);
}
private PrintStream getOutputPrintStream() throws IOException
{
if (pagerCommand == null) {
pagerProcess = null;
return System.out;
} else {
pagerProcess = Runtime.getRuntime().exec(new String[]{"sh", "-c",
pagerCommand + " >/dev/tty"});
return new PrintStream(pagerProcess.getOutputStream());
}
}
private void closeOutputPrintStream(PrintStream os)
{
if (os != System.out) {
os.close();
try {
pagerProcess.waitFor();
} catch (InterruptedException ex) {
LOG.debug("Interrupted");
}
}
}
private static String expandFileName(String fileName, boolean expandWildCard) throws IOException
{
if (fileName.matches("^[a-zA-Z]+:.*")) {
// it's a URL
return fileName;
}
// TODO: need to work with other users' home directory
if (fileName.startsWith("~" + File.separator)) {
fileName = System.getProperty("user.home") + fileName.substring(1);
}
fileName = new File(fileName).getCanonicalPath();
//LOG.debug("Canonical path: {}", fileName);
if (expandWildCard) {
DirectoryScanner scanner = new DirectoryScanner();
scanner.setIncludes(new String[]{fileName});
scanner.scan();
String[] files = scanner.getIncludedFiles();
if (files.length == 0) {
throw new CliException(fileName + " does not match any file");
} else if (files.length > 1) {
throw new CliException(fileName + " matches more than one file");
}
return files[0];
} else {
return fileName;
}
}
private static String[] expandFileNames(String fileName) throws IOException
{
// TODO: need to work with other users
if (fileName.matches("^[a-zA-Z]+:.*")) {
// it's a URL
return new String[]{fileName};
}
if (fileName.startsWith("~" + File.separator)) {
fileName = System.getProperty("user.home") + fileName.substring(1);
}
fileName = new File(fileName).getCanonicalPath();
LOG.debug("Canonical path: {}", fileName);
DirectoryScanner scanner = new DirectoryScanner();
scanner.setIncludes(new String[]{fileName});
scanner.scan();
return scanner.getIncludedFiles();
}
private static String expandCommaSeparatedFiles(String filenames) throws IOException
{
String[] entries = filenames.split(",");
StringBuilder result = new StringBuilder(filenames.length());
for (String entry : entries) {
for (String file : expandFileNames(entry)) {
if (result.length() > 0) {
result.append(",");
}
result.append(file);
}
}
if (result.length() == 0) {
return null;
}
return result.toString();
}
protected ApplicationReport getApplicationByName(String appName)
{
if (appName == null) {
throw new CliException("Invalid application name provided by user");
}
List<ApplicationReport> appList = getApplicationList();
for (ApplicationReport ar : appList) {
if ((ar.getName().equals(appName)) &&
(ar.getYarnApplicationState() != YarnApplicationState.KILLED) &&
(ar.getYarnApplicationState() != YarnApplicationState.FINISHED)) {
LOG.debug("Application Name: {} Application ID: {} Application State: {}",
ar.getName(), ar.getApplicationId().toString(), YarnApplicationState.FINISHED);
return ar;
}
}
return null;
}
protected ApplicationReport getApplication(String appId)
{
List<ApplicationReport> appList = getApplicationList();
if (StringUtils.isNumeric(appId)) {
int appSeq = Integer.parseInt(appId);
for (ApplicationReport ar : appList) {
if (ar.getApplicationId().getId() == appSeq) {
return ar;
}
}
} else {
for (ApplicationReport ar : appList) {
if (ar.getApplicationId().toString().equals(appId)) {
return ar;
}
}
}
return null;
}
static class CliException extends RuntimeException
{
private static final long serialVersionUID = 1L;
CliException(String msg, Throwable cause)
{
super(msg, cause);
}
CliException(String msg)
{
super(msg);
}
}
public void preImpersonationInit(String[] args) throws IOException
{
Signal.handle(new Signal("INT"), new SignalHandler()
{
@Override
public void handle(Signal sig)
{
System.out.println("^C");
if (commandThread != null) {
commandThread.interrupt();
mainThread.interrupt();
} else {
System.out.print(prompt);
System.out.flush();
}
}
});
consolePresent = (System.console() != null);
Options options = new Options();
options.addOption("e", true, "Commands are read from the argument");
options.addOption("v", false, "Verbose mode level 1");
options.addOption("vv", false, "Verbose mode level 2");
options.addOption("vvv", false, "Verbose mode level 3");
options.addOption("vvvv", false, "Verbose mode level 4");
options.addOption("r", false, "JSON Raw mode");
options.addOption("p", true, "JSONP padding function");
options.addOption("h", false, "Print this help");
options.addOption("f", true, "Use the specified prompt at all time");
options.addOption("kp", true, "Use the specified kerberos principal");
options.addOption("kt", true, "Use the specified kerberos keytab");
CommandLineParser parser = new BasicParser();
try {
CommandLine cmd = parser.parse(options, args);
if (cmd.hasOption("v")) {
verboseLevel = 1;
}
if (cmd.hasOption("vv")) {
verboseLevel = 2;
}
if (cmd.hasOption("vvv")) {
verboseLevel = 3;
}
if (cmd.hasOption("vvvv")) {
verboseLevel = 4;
}
if (cmd.hasOption("r")) {
raw = true;
}
if (cmd.hasOption("e")) {
commandsToExecute = cmd.getOptionValues("e");
consolePresent = false;
}
if (cmd.hasOption("p")) {
jsonp = cmd.getOptionValue("p");
}
if (cmd.hasOption("f")) {
forcePrompt = cmd.getOptionValue("f");
}
if (cmd.hasOption("h")) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp(ApexCli.class.getSimpleName(), options);
System.exit(0);
}
if (cmd.hasOption("kp")) {
kerberosPrincipal = cmd.getOptionValue("kp");
}
if (cmd.hasOption("kt")) {
kerberosKeyTab = cmd.getOptionValue("kt");
}
} catch (ParseException ex) {
System.err.println("Invalid argument: " + ex);
System.exit(1);
}
if (kerberosPrincipal == null && kerberosKeyTab != null) {
System.err.println("Kerberos key tab is specified but not the kerberos principal. Please specify it using the -kp option.");
System.exit(1);
}
if (kerberosPrincipal != null && kerberosKeyTab == null) {
System.err.println("Kerberos principal is specified but not the kerberos key tab. Please specify it using the -kt option.");
System.exit(1);
}
Level logLevel;
switch (verboseLevel) {
case 0:
logLevel = Level.OFF;
break;
case 1:
logLevel = Level.ERROR;
break;
case 2:
logLevel = Level.WARN;
break;
case 3:
logLevel = Level.INFO;
break;
default:
logLevel = Level.DEBUG;
break;
}
for (org.apache.log4j.Logger logger : new org.apache.log4j.Logger[]{
org.apache.log4j.Logger.getRootLogger(),
org.apache.log4j.Logger.getLogger(ApexCli.class)
}) {
/*
* Override logLevel specified by user, the same logLevel would be inherited by all
* appenders related to logger.
*/
logger.setLevel(logLevel);
@SuppressWarnings("unchecked")
Enumeration<Appender> allAppenders = logger.getAllAppenders();
while (allAppenders.hasMoreElements()) {
Appender appender = allAppenders.nextElement();
if (appender instanceof ConsoleAppender) {
((ConsoleAppender)appender).setThreshold(logLevel);
}
}
}
if (commandsToExecute != null) {
for (String command : commandsToExecute) {
LOG.debug("Command to be executed: {}", command);
}
}
if (kerberosPrincipal != null && kerberosKeyTab != null) {
StramUserLogin.authenticate(kerberosPrincipal, kerberosKeyTab);
} else {
Configuration config = new YarnConfiguration();
StramClientUtils.addDTLocalResources(config);
StramUserLogin.attemptAuthentication(config);
}
}
/**
* get highlight color based on env variable first and then config
*
*/
protected String getHighlightColor()
{
if (highlightColor == null) {
highlightColor = System.getenv(APEX_HIGHLIGHT_COLOR_ENV_VAR_NAME);
if (StringUtils.isBlank(highlightColor)) {
highlightColor = conf.get(APEX_HIGHLIGHT_COLOR_PROPERTY_NAME, FORMAT_BOLD);
}
highlightColor = highlightColor.replace("\\e", "\033");
}
return highlightColor;
}
public void init() throws IOException
{
conf = StramClientUtils.addDTSiteResources(new YarnConfiguration());
SecurityUtils.init(conf);
fs = StramClientUtils.newFileSystemInstance(conf);
stramAgent = new StramAgent(fs, conf);
yarnClient.init(conf);
yarnClient.start();
LOG.debug("Yarn Client initialized and started");
String socks = conf.get(CommonConfigurationKeysPublic.HADOOP_SOCKS_SERVER_KEY);
if (socks != null) {
int colon = socks.indexOf(':');
if (colon > 0) {
LOG.info("Using socks proxy at {}", socks);
System.setProperty("socksProxyHost", socks.substring(0, colon));
System.setProperty("socksProxyPort", socks.substring(colon + 1));
}
}
}
private void processSourceFile(String fileName, ConsoleReader reader) throws FileNotFoundException, IOException
{
fileName = expandFileName(fileName, true);
LOG.debug("Sourcing {}", fileName);
boolean consolePresentSaved = consolePresent;
consolePresent = false;
FileLineReader fr = null;
String line;
try {
fr = new FileLineReader(fileName);
while ((line = fr.readLine("")) != null) {
processLine(line, fr, true);
}
} finally {
consolePresent = consolePresentSaved;
if (fr != null) {
fr.close();
}
}
}
private static final class MyNullCompleter implements Completer
{
public static final MyNullCompleter INSTANCE = new MyNullCompleter();
@Override
public int complete(final String buffer, final int cursor, final List<CharSequence> candidates)
{
candidates.add("");
return cursor;
}
}
private static final class MyFileNameCompleter extends FileNameCompleter
{
@Override
public int complete(final String buffer, final int cursor, final List<CharSequence> candidates)
{
int result = super.complete(buffer, cursor, candidates);
if (candidates.isEmpty()) {
candidates.add("");
result = cursor;
}
return result;
}
}
private List<Completer> defaultCompleters()
{
Map<String, CommandSpec> commands = new TreeMap<>();
commands.putAll(logicalPlanChangeCommands);
commands.putAll(connectedCommands);
commands.putAll(globalCommands);
List<Completer> completers = new LinkedList<>();
for (Map.Entry<String, CommandSpec> entry : commands.entrySet()) {
String command = entry.getKey();
CommandSpec cs = entry.getValue();
List<Completer> argCompleters = new LinkedList<>();
argCompleters.add(new StringsCompleter(command));
Arg[] args = (Arg[])ArrayUtils.addAll(cs.requiredArgs, cs.optionalArgs);
if (args != null) {
if (cs instanceof OptionsCommandSpec) {
// ugly hack because jline cannot dynamically change completer while user types
if (args[0] instanceof FileArg || args[0] instanceof VarArg) {
for (int i = 0; i < 10; i++) {
argCompleters.add(new MyFileNameCompleter());
}
}
} else {
for (Arg arg : args) {
if (arg instanceof FileArg || arg instanceof VarArg) {
argCompleters.add(new MyFileNameCompleter());
} else if (arg instanceof CommandArg) {
argCompleters.add(new StringsCompleter(commands.keySet().toArray(new String[]{})));
} else {
argCompleters.add(MyNullCompleter.INSTANCE);
}
}
}
}
completers.add(new ArgumentCompleter(argCompleters));
}
List<Completer> argCompleters = new LinkedList<>();
Set<String> set = new TreeSet<>();
set.addAll(aliases.keySet());
set.addAll(macros.keySet());
argCompleters.add(new StringsCompleter(set.toArray(new String[]{})));
for (int i = 0; i < 10; i++) {
argCompleters.add(new MyFileNameCompleter());
}
completers.add(new ArgumentCompleter(argCompleters));
return completers;
}
private void setupCompleter(ConsoleReader reader)
{
reader.addCompleter(new AggregateCompleter(defaultCompleters()));
}
private void updateCompleter(ConsoleReader reader)
{
List<Completer> completers = new ArrayList<>(reader.getCompleters());
for (Completer c : completers) {
reader.removeCompleter(c);
}
setupCompleter(reader);
}
private void setupHistory(ConsoleReader reader)
{
File historyFile = new File(StramClientUtils.getUserDTDirectory(), "cli_history");
historyFile.getParentFile().mkdirs();
try {
topLevelHistory = new FileHistory(historyFile);
reader.setHistory(topLevelHistory);
historyFile = new File(StramClientUtils.getUserDTDirectory(), "cli_history_clp");
changingLogicalPlanHistory = new FileHistory(historyFile);
} catch (IOException ex) {
System.err.printf("Unable to open %s for writing.", historyFile);
}
}
private void setupAgents() throws IOException
{
recordingsAgent = new RecordingsAgent(stramAgent);
}
public void run() throws IOException
{
ConsoleReader reader = new ConsoleReader();
reader.setExpandEvents(false);
reader.setBellEnabled(false);
try {
processSourceFile(StramClientUtils.getConfigDir() + "/clirc_system", reader);
} catch (Exception ex) {
// ignore
}
try {
processSourceFile(StramClientUtils.getUserDTDirectory() + "/clirc", reader);
} catch (Exception ex) {
// ignore
}
if (consolePresent) {
printWelcomeMessage();
setupCompleter(reader);
setupHistory(reader);
//reader.setHandleUserInterrupt(true);
} else {
reader.setEchoCharacter((char)0);
}
setupAgents();
String line;
PrintWriter out = new PrintWriter(System.out);
int i = 0;
while (true) {
if (commandsToExecute != null) {
if (i >= commandsToExecute.length) {
break;
}
line = commandsToExecute[i++];
} else {
line = readLine(reader);
if (line == null) {
break;
}
}
processLine(line, reader, true);
out.flush();
}
if (topLevelHistory != null) {
try {
topLevelHistory.flush();
} catch (IOException ex) {
LOG.warn("Cannot flush command history", ex);
}
}
if (changingLogicalPlanHistory != null) {
try {
changingLogicalPlanHistory.flush();
} catch (IOException ex) {
LOG.warn("Cannot flush command history", ex);
}
}
if (consolePresent) {
System.out.println("exit");
}
}
private List<String> expandMacro(List<String> lines, String[] args)
{
List<String> expandedLines = new ArrayList<>();
for (String line : lines) {
int previousIndex = 0;
StringBuilder expandedLine = new StringBuilder(line.length());
while (true) {
// Search for $0..$9 within the each line and replace by corresponding args
int currentIndex = line.indexOf('$', previousIndex);
if (currentIndex > 0 && line.length() > currentIndex + 1) {
int argIndex = line.charAt(currentIndex + 1) - '0';
if (args.length > argIndex && argIndex >= 0) {
// Replace $0 with macro name or $1..$9 with input arguments
expandedLine.append(line.substring(previousIndex, currentIndex)).append(args[argIndex]);
} else if (argIndex >= 0 && argIndex <= 9) {
// Arguments for $1..$9 were not supplied - replace with empty strings
expandedLine.append(line.substring(previousIndex, currentIndex));
} else {
// Outside valid arguments range - ignore and do not replace
expandedLine.append(line.substring(previousIndex, currentIndex + 2));
}
currentIndex += 2;
} else {
expandedLine.append(line.substring(previousIndex));
expandedLines.add(expandedLine.toString());
break;
}
previousIndex = currentIndex;
}
}
return expandedLines;
}
private static String ltrim(String s)
{
int i = 0;
while (i < s.length() && Character.isWhitespace(s.charAt(i))) {
i++;
}
return s.substring(i);
}
private void processLine(String line, final ConsoleReader reader, boolean expandMacroAlias)
{
try {
// clear interrupt flag
Thread.interrupted();
if (reader.isHistoryEnabled()) {
History history = reader.getHistory();
if (history instanceof FileHistory) {
try {
((FileHistory)history).flush();
} catch (IOException ex) {
// ignore
}
}
}
//LOG.debug("line: \"{}\"", line);
List<String[]> commands = tokenizer.tokenize(line);
if (commands == null) {
return;
}
for (final String[] args : commands) {
if (args.length == 0 || StringUtils.isBlank(args[0])) {
continue;
}
//LOG.debug("Got: {}", mapper.writeValueAsString(args));
if (expandMacroAlias) {
if (macros.containsKey(args[0])) {
List<String> macroItems = expandMacro(macros.get(args[0]), args);
for (String macroItem : macroItems) {
if (consolePresent) {
System.out.println("expanded-macro> " + macroItem);
}
processLine(macroItem, reader, false);
}
continue;
}
if (aliases.containsKey(args[0])) {
processLine(aliases.get(args[0]), reader, false);
continue;
}
}
CommandSpec cs = null;
if (changingLogicalPlan) {
cs = logicalPlanChangeCommands.get(args[0]);
} else {
if (currentApp != null) {
cs = connectedCommands.get(args[0]);
}
if (cs == null) {
cs = globalCommands.get(args[0]);
}
}
if (cs == null) {
if (connectedCommands.get(args[0]) != null) {
System.err.println("\"" + args[0] + "\" is valid only when connected to an application. Type \"connect <appid>\" to connect to an application.");
lastCommandError = true;
} else if (logicalPlanChangeCommands.get(args[0]) != null) {
System.err.println("\"" + args[0] + "\" is valid only when changing a logical plan. Type \"begin-logical-plan-change\" to change a logical plan");
lastCommandError = true;
} else {
System.err.println("Invalid command '" + args[0] + "'. Type \"help\" for list of commands");
lastCommandError = true;
}
} else {
try {
cs.verifyArguments(args);
} catch (CliException ex) {
cs.printUsage(args[0]);
throw ex;
}
final Command command = cs.command;
commandThread = new Thread()
{
@Override
public void run()
{
try {
command.execute(args, reader);
lastCommandError = false;
} catch (Exception e) {
handleException(e);
} catch (Error e) {
handleException(e);
System.err.println("Fatal error encountered");
System.exit(1);
}
}
};
mainThread = Thread.currentThread();
commandThread.start();
try {
commandThread.join();
} catch (InterruptedException ex) {
System.err.println("Interrupted");
}
commandThread = null;
}
}
} catch (Exception e) {
handleException(e);
}
}
private void handleException(Throwable e)
{
System.err.println(ExceptionUtils.getFullStackTrace(e));
LOG.error("Exception caught: ", e);
lastCommandError = true;
}
private void printWelcomeMessage()
{
VersionInfo v = VersionInfo.APEX_VERSION;
System.out.println("Apex CLI " + v.getVersion() + " " + v.getDate() + " " + v.getRevision());
}
private void printHelp(String command, CommandSpec commandSpec, PrintStream os)
{
if (consolePresent) {
os.print(getHighlightColor());
os.print(command);
os.print(COLOR_RESET);
} else {
os.print(command);
}
if (commandSpec instanceof OptionsCommandSpec) {
OptionsCommandSpec ocs = (OptionsCommandSpec)commandSpec;
if (ocs.options != null) {
os.print(" [options]");
}
}
if (commandSpec.requiredArgs != null) {
for (Arg arg : commandSpec.requiredArgs) {
if (consolePresent) {
os.print(" " + ITALICS + arg + COLOR_RESET);
} else {
os.print(" <" + arg + ">");
}
}
}
if (commandSpec.optionalArgs != null) {
for (Arg arg : commandSpec.optionalArgs) {
if (consolePresent) {
os.print(" [" + ITALICS + arg + COLOR_RESET);
} else {
os.print(" [<" + arg + ">");
}
if (arg instanceof VarArg) {
os.print(" ...");
}
os.print("]");
}
}
os.println("\n\t" + commandSpec.description);
if (commandSpec instanceof OptionsCommandSpec) {
OptionsCommandSpec ocs = (OptionsCommandSpec)commandSpec;
if (ocs.options != null) {
os.println("\tOptions:");
HelpFormatter formatter = new HelpFormatter();
PrintWriter pw = new PrintWriter(os);
formatter.printOptions(pw, 80, ocs.options, 12, 4);
pw.flush();
}
}
}
private void printHelp(Map<String, CommandSpec> commandSpecs, PrintStream os)
{
for (Map.Entry<String, CommandSpec> entry : commandSpecs.entrySet()) {
printHelp(entry.getKey(), entry.getValue(), os);
}
}
private String readLine(ConsoleReader reader)
throws IOException
{
if (forcePrompt == null) {
prompt = "";
if (consolePresent) {
if (changingLogicalPlan) {
prompt = "logical-plan-change";
} else {
prompt = "apex";
}
if (currentApp != null) {
prompt += " (";
prompt += currentApp.getApplicationId().toString();
prompt += ") ";
}
prompt += "> ";
}
} else {
prompt = forcePrompt;
}
String line = reader.readLine(prompt, consolePresent ? null : (char)0);
if (line == null) {
return null;
}
return ltrim(line);
}
private List<ApplicationReport> getApplicationList()
{
try {
return StramUtils.getApexApplicationList(yarnClient);
} catch (Exception e) {
throw new CliException("Error getting application list from resource manager", e);
}
}
private String getContainerLongId(String containerId)
{
JSONObject json = getResource(StramWebServices.PATH_PHYSICAL_PLAN_CONTAINERS, currentApp);
int shortId = 0;
if (StringUtils.isNumeric(containerId)) {
shortId = Integer.parseInt(containerId);
}
try {
Object containersObj = json.get("containers");
JSONArray containers;
if (containersObj instanceof JSONArray) {
containers = (JSONArray)containersObj;
} else {
containers = new JSONArray();
containers.put(containersObj);
}
if (containersObj != null) {
for (int o = containers.length(); o-- > 0; ) {
JSONObject container = containers.getJSONObject(o);
String id = container.getString("id");
if (id.equals(containerId) || (shortId != 0 && (id.endsWith("_" + shortId) || id.endsWith("0" + shortId)))) {
return id;
}
}
}
} catch (JSONException ex) {
// ignore
}
return null;
}
private ApplicationReport assertRunningApp(ApplicationReport app)
{
ApplicationReport r;
try {
r = yarnClient.getApplicationReport(app.getApplicationId());
if (r.getYarnApplicationState() != YarnApplicationState.RUNNING) {
String msg = String.format("Application %s not running (status %s)",
r.getApplicationId().getId(), r.getYarnApplicationState());
throw new CliException(msg);
}
} catch (YarnException rmExc) {
throw new CliException("Unable to determine application status", rmExc);
} catch (IOException rmExc) {
throw new CliException("Unable to determine application status", rmExc);
}
return r;
}
private JSONObject getResource(String resourcePath, ApplicationReport appReport)
{
return getResource(new StramAgent.StramUriSpec().path(resourcePath), appReport, new WebServicesClient.GetWebServicesHandler<JSONObject>());
}
private JSONObject getResource(StramAgent.StramUriSpec uriSpec, ApplicationReport appReport)
{
return getResource(uriSpec, appReport, new WebServicesClient.GetWebServicesHandler<JSONObject>());
}
private JSONObject getResource(StramAgent.StramUriSpec uriSpec, ApplicationReport appReport, WebServicesClient.WebServicesHandler handler)
{
if (appReport == null) {
throw new CliException("No application selected");
}
if (StringUtils.isEmpty(appReport.getTrackingUrl()) || appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
appReport = null;
throw new CliException("Application terminated");
}
WebServicesClient wsClient = new WebServicesClient();
try {
return stramAgent.issueStramWebRequest(wsClient, appReport.getApplicationId().toString(), uriSpec, handler);
} catch (Exception e) {
// check the application status as above may have failed due application termination etc.
if (appReport == currentApp) {
currentApp = assertRunningApp(appReport);
}
throw new CliException("Failed to request web service for appid " + appReport.getApplicationId().toString(), e);
}
}
private List<AppFactory> getMatchingAppFactories(StramAppLauncher submitApp, String matchString, boolean exactMatch)
{
try {
List<AppFactory> cfgList = submitApp.getBundledTopologies();
if (cfgList.isEmpty()) {
return null;
} else if (matchString == null) {
return cfgList;
} else {
List<AppFactory> result = new ArrayList<>();
if (!exactMatch) {
matchString = matchString.toLowerCase();
}
for (AppFactory ac : cfgList) {
String appName = ac.getName();
String appAlias = submitApp.getLogicalPlanConfiguration().getAppAlias(appName);
if (exactMatch) {
if (matchString.equals(appName) || matchString.equals(appAlias)) {
result.add(ac);
}
} else if (appName.toLowerCase().contains(matchString) || (appAlias != null && appAlias.toLowerCase()
.contains(matchString))) {
result.add(ac);
}
}
return result;
}
} catch (Exception ex) {
LOG.warn("Caught Exception: ", ex);
return null;
}
}
/*
* Below is the implementation of all commands
*/
private class HelpCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
PrintStream os = getOutputPrintStream();
if (args.length < 2) {
os.println("GLOBAL COMMANDS EXCEPT WHEN CHANGING LOGICAL PLAN:\n");
printHelp(globalCommands, os);
os.println();
os.println("COMMANDS WHEN CONNECTED TO AN APP (via connect <appid>) EXCEPT WHEN CHANGING LOGICAL PLAN:\n");
printHelp(connectedCommands, os);
os.println();
os.println("COMMANDS WHEN CHANGING LOGICAL PLAN (via begin-logical-plan-change):\n");
printHelp(logicalPlanChangeCommands, os);
os.println();
} else {
if (args[1].equals("help")) {
printHelp("help", globalCommands.get("help"), os);
} else {
boolean valid = false;
CommandSpec cs = globalCommands.get(args[1]);
if (cs != null) {
os.println("This usage is valid except when changing logical plan");
printHelp(args[1], cs, os);
os.println();
valid = true;
}
cs = connectedCommands.get(args[1]);
if (cs != null) {
os.println("This usage is valid when connected to an app except when changing logical plan");
printHelp(args[1], cs, os);
os.println();
valid = true;
}
cs = logicalPlanChangeCommands.get(args[1]);
if (cs != null) {
os.println("This usage is only valid when changing logical plan (via begin-logical-plan-change)");
printHelp(args[1], cs, os);
os.println();
valid = true;
}
if (!valid) {
os.println("Help for \"" + args[1] + "\" does not exist.");
}
}
}
closeOutputPrintStream(os);
}
}
private class EchoCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
for (int i = 1; i < args.length; i++) {
if (i > 1) {
System.out.print(" ");
}
System.out.print(args[i]);
}
System.out.println();
}
}
private class ConnectCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
currentApp = getApplication(args[1]);
if (currentApp == null) {
throw new CliException("Streaming application with id " + args[1] + " is not found.");
}
LOG.debug("Selected {} with tracking url {}", currentApp.getApplicationId(), currentApp.getTrackingUrl());
getResource(StramWebServices.PATH_INFO, currentApp);
if (consolePresent) {
System.out.println("Connected to application " + currentApp.getApplicationId());
}
}
}
private class LaunchCommand implements Command
{
@Override
@SuppressWarnings("SleepWhileInLoop")
public void execute(String[] args, ConsoleReader reader) throws Exception
{
String[] newArgs = new String[args.length - 1];
System.arraycopy(args, 1, newArgs, 0, args.length - 1);
LaunchCommandLineInfo commandLineInfo = getLaunchCommandLineInfo(newArgs);
if (commandLineInfo.configFile != null) {
commandLineInfo.configFile = expandFileName(commandLineInfo.configFile, true);
}
// see if the given config file is a config package
ConfigPackage cp = null;
String requiredAppPackageName = null;
try {
cp = new ConfigPackage(new File(commandLineInfo.configFile));
requiredAppPackageName = cp.getAppPackageName();
} catch (Exception ex) {
// fall through, it's not a config package
}
try {
Configuration config;
String configFile = cp == null ? commandLineInfo.configFile : null;
try {
config = StramAppLauncher.getOverriddenConfig(StramClientUtils.addDTSiteResources(new Configuration()), configFile, commandLineInfo.overrideProperties);
if (commandLineInfo.libjars != null) {
commandLineInfo.libjars = expandCommaSeparatedFiles(commandLineInfo.libjars);
if (commandLineInfo.libjars != null) {
config.set(StramAppLauncher.LIBJARS_CONF_KEY_NAME, commandLineInfo.libjars);
}
}
if (commandLineInfo.files != null) {
commandLineInfo.files = expandCommaSeparatedFiles(commandLineInfo.files);
if (commandLineInfo.files != null) {
config.set(StramAppLauncher.FILES_CONF_KEY_NAME, commandLineInfo.files);
}
}
if (commandLineInfo.archives != null) {
commandLineInfo.archives = expandCommaSeparatedFiles(commandLineInfo.archives);
if (commandLineInfo.archives != null) {
config.set(StramAppLauncher.ARCHIVES_CONF_KEY_NAME, commandLineInfo.archives);
}
}
if (commandLineInfo.origAppId != null) {
config.set(StramAppLauncher.ORIGINAL_APP_ID, commandLineInfo.origAppId);
}
config.set(StramAppLauncher.QUEUE_NAME, commandLineInfo.queue != null ? commandLineInfo.queue : "default");
if (commandLineInfo.tags != null) {
config.set(StramAppLauncher.TAGS, commandLineInfo.tags);
}
} catch (Exception ex) {
throw new CliException("Error opening the config XML file: " + configFile, ex);
}
StramAppLauncher submitApp;
AppFactory appFactory = null;
String matchString = null;
if (commandLineInfo.args.length == 0) {
if (commandLineInfo.origAppId == null) {
throw new CliException("Launch requires an APA or JAR file when not resuming a terminated application");
}
submitApp = new StramAppLauncher(fs, config);
appFactory = submitApp.new RecoveryAppFactory();
} else {
String fileName = expandFileName(commandLineInfo.args[0], true);
if (commandLineInfo.args.length >= 2) {
matchString = commandLineInfo.args[1];
}
if (fileName.endsWith(".json")) {
File file = new File(fileName);
submitApp = new StramAppLauncher(file.getName(), config);
appFactory = new StramAppLauncher.JsonFileAppFactory(file);
if (matchString != null) {
LOG.warn("Match string \"{}\" is ignored for launching applications specified in JSON", matchString);
}
} else if (fileName.endsWith(".properties")) {
File file = new File(fileName);
submitApp = new StramAppLauncher(file.getName(), config);
appFactory = new StramAppLauncher.PropertyFileAppFactory(file);
if (matchString != null) {
LOG.warn("Match string \"{}\" is ignored for launching applications specified in properties file", matchString);
}
} else {
// see if it's an app package
AppPackage ap = null;
try {
ap = newAppPackageInstance(new URI(fileName), true);
} catch (Exception ex) {
// It's not an app package
if (requiredAppPackageName != null) {
throw new CliException("Config package requires an app package name of \"" + requiredAppPackageName + "\"");
}
}
if (ap != null) {
try {
if (!commandLineInfo.force) {
checkPlatformCompatible(ap);
checkConfigPackageCompatible(ap, cp);
}
launchAppPackage(ap, cp, commandLineInfo, reader);
return;
} finally {
IOUtils.closeQuietly(ap);
}
}
submitApp = getStramAppLauncher(fileName, config, commandLineInfo.ignorePom);
}
}
submitApp.loadDependencies();
if (commandLineInfo.origAppId != null) {
// ensure app is not running
ApplicationReport ar = null;
try {
ar = getApplication(commandLineInfo.origAppId);
} catch (Exception e) {
// application (no longer) in the RM history, does not prevent restart from state in DFS
LOG.debug("Cannot determine status of application {} {}", commandLineInfo.origAppId, ExceptionUtils.getMessage(e));
}
if (ar != null) {
if (ar.getFinalApplicationStatus() == FinalApplicationStatus.UNDEFINED) {
throw new CliException("Cannot relaunch non-terminated application: " + commandLineInfo.origAppId + " " + ar.getYarnApplicationState());
}
if (appFactory == null && matchString == null) {
// skip selection if we can match application name from previous run
List<AppFactory> matchingAppFactories = getMatchingAppFactories(submitApp, ar.getName(), commandLineInfo.exactMatch);
for (AppFactory af : matchingAppFactories) {
String appName = submitApp.getLogicalPlanConfiguration().getAppAlias(af.getName());
if (appName == null) {
appName = af.getName();
}
// limit to exact match
if (appName.equals(ar.getName())) {
appFactory = af;
break;
}
}
}
}
}
if (appFactory == null && matchString != null) {
// attempt to interpret argument as property file - do we still need it?
try {
File file = new File(expandFileName(commandLineInfo.args[1], true));
if (file.exists()) {
if (commandLineInfo.args[1].endsWith(".properties")) {
appFactory = new StramAppLauncher.PropertyFileAppFactory(file);
} else if (commandLineInfo.args[1].endsWith(".json")) {
appFactory = new StramAppLauncher.JsonFileAppFactory(file);
}
}
} catch (Exception | NoClassDefFoundError ex) {
// ignore
}
}
if (appFactory == null) {
List<AppFactory> matchingAppFactories = getMatchingAppFactories(submitApp, matchString, commandLineInfo.exactMatch);
if (matchingAppFactories == null || matchingAppFactories.isEmpty()) {
throw new CliException("No applications matching \"" + matchString + "\" bundled in jar.");
} else if (matchingAppFactories.size() == 1) {
appFactory = matchingAppFactories.get(0);
} else if (matchingAppFactories.size() > 1) {
//Store the appNames sorted in alphabetical order and their position in matchingAppFactories list
TreeMap<String, Integer> appNamesInAlphabeticalOrder = new TreeMap<>();
// Display matching applications
for (int i = 0; i < matchingAppFactories.size(); i++) {
String appName = matchingAppFactories.get(i).getName();
String appAlias = submitApp.getLogicalPlanConfiguration().getAppAlias(appName);
if (appAlias != null) {
appName = appAlias;
}
appNamesInAlphabeticalOrder.put(appName, i);
}
//Create a mapping between the app display number and original index at matchingAppFactories
int index = 1;
HashMap<Integer, Integer> displayIndexToOriginalUnsortedIndexMap = new HashMap<>();
for (Map.Entry<String, Integer> entry : appNamesInAlphabeticalOrder.entrySet()) {
//Map display number of the app to original unsorted index
displayIndexToOriginalUnsortedIndexMap.put(index, entry.getValue());
//Display the app names
System.out.printf("%3d. %s\n", index++, entry.getKey());
}
// Exit if not in interactive mode
if (!consolePresent) {
throw new CliException("More than one application in jar file match '" + matchString + "'");
} else {
boolean useHistory = reader.isHistoryEnabled();
reader.setHistoryEnabled(false);
History previousHistory = reader.getHistory();
History dummyHistory = new MemoryHistory();
reader.setHistory(dummyHistory);
List<Completer> completers = new ArrayList<>(reader.getCompleters());
for (Completer c : completers) {
reader.removeCompleter(c);
}
reader.setHandleUserInterrupt(true);
String optionLine;
try {
optionLine = reader.readLine("Choose application: ");
} finally {
reader.setHandleUserInterrupt(false);
reader.setHistoryEnabled(useHistory);
reader.setHistory(previousHistory);
for (Completer c : completers) {
reader.addCompleter(c);
}
}
try {
int option = Integer.parseInt(optionLine);
if (0 < option && option <= matchingAppFactories.size()) {
int appIndex = displayIndexToOriginalUnsortedIndexMap.get(option);
appFactory = matchingAppFactories.get(appIndex);
}
} catch (Exception ex) {
// ignore
}
}
}
}
if (appFactory != null) {
if (!commandLineInfo.localMode) {
// see whether there is an app with the same name and user name running
String appName = config.get(LogicalPlanConfiguration.KEY_APPLICATION_NAME, appFactory.getName());
ApplicationReport duplicateApp = StramClientUtils.getStartedAppInstanceByName(yarnClient, appName, UserGroupInformation.getLoginUser().getUserName(), null);
if (duplicateApp != null) {
throw new CliException("Application with the name \"" + duplicateApp.getName() + "\" already running under the current user \"" + duplicateApp.getUser() + "\". Please choose another name. You can change the name by setting " + LogicalPlanConfiguration.KEY_APPLICATION_NAME);
}
// This is for suppressing System.out printouts from applications so that the user of CLI will not be confused by those printouts
PrintStream originalStream = suppressOutput();
ApplicationId appId = null;
try {
if (raw) {
PrintStream dummyStream = new PrintStream(new OutputStream()
{
@Override
public void write(int b)
{
// no-op
}
});
System.setOut(dummyStream);
}
appId = submitApp.launchApp(appFactory);
currentApp = yarnClient.getApplicationReport(appId);
} finally {
restoreOutput(originalStream);
}
if (appId != null) {
printJson("{\"appId\": \"" + appId + "\"}");
}
} else {
submitApp.runLocal(appFactory);
}
} else {
System.err.println("No application specified.");
}
} finally {
IOUtils.closeQuietly(cp);
}
}
}
private class GetConfigParameterCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
PrintStream os = getOutputPrintStream();
if (args.length == 1) {
Map<String, String> sortedMap = new TreeMap<>();
for (Map.Entry<String, String> entry : conf) {
sortedMap.put(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, String> entry : sortedMap.entrySet()) {
os.println(entry.getKey() + "=" + entry.getValue());
}
} else {
String value = conf.get(args[1]);
if (value != null) {
os.println(value);
}
}
closeOutputPrintStream(os);
}
}
private class ShutdownAppCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
ApplicationReport[] apps;
if (args.length == 1) {
if (currentApp == null) {
throw new CliException("No application selected");
} else {
apps = new ApplicationReport[]{currentApp};
}
} else {
apps = new ApplicationReport[args.length - 1];
for (int i = 1; i < args.length; i++) {
apps[i - 1] = getApplication(args[i]);
if (apps[i - 1] == null) {
throw new CliException("Streaming application with id " + args[i] + " is not found.");
}
}
}
for (ApplicationReport app : apps) {
try {
JSONObject response = getResource(new StramAgent.StramUriSpec().path(StramWebServices.PATH_SHUTDOWN), app, new WebServicesClient.WebServicesHandler<JSONObject>()
{
@Override
public JSONObject process(WebResource.Builder webResource, Class<JSONObject> clazz)
{
return webResource.accept(MediaType.APPLICATION_JSON).post(clazz, new JSONObject());
}
});
if (consolePresent) {
System.out.println("Shutdown requested: " + response);
}
currentApp = null;
} catch (Exception e) {
throw new CliException("Failed to request shutdown for appid " + app.getApplicationId().toString(), e);
}
}
}
}
private class ListAppsCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
try {
JSONArray jsonArray = new JSONArray();
List<ApplicationReport> appList = getApplicationList();
Collections.sort(appList, new Comparator<ApplicationReport>()
{
@Override
public int compare(ApplicationReport o1, ApplicationReport o2)
{
return o1.getApplicationId().getId() - o2.getApplicationId().getId();
}
});
int totalCnt = 0;
int runningCnt = 0;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss Z");
for (ApplicationReport ar : appList) {
/*
* This is inefficient, but what the heck, if this can be passed through the command line, can anyone notice slowness.
*/
JSONObject jsonObj = new JSONObject();
jsonObj.put("startTime", sdf.format(new java.util.Date(ar.getStartTime())));
jsonObj.put("id", ar.getApplicationId().getId());
jsonObj.put("name", ar.getName());
jsonObj.put("state", ar.getYarnApplicationState().name());
jsonObj.put("trackingUrl", ar.getTrackingUrl());
jsonObj.put("finalStatus", ar.getFinalApplicationStatus());
JSONArray tags = new JSONArray();
for (String tag : ar.getApplicationTags()) {
tags.put(tag);
}
jsonObj.put("tags", tags);
totalCnt++;
if (ar.getYarnApplicationState() == YarnApplicationState.RUNNING) {
runningCnt++;
}
if (args.length > 1) {
if (StringUtils.isNumeric(args[1])) {
if (jsonObj.getString("id").equals(args[1])) {
jsonArray.put(jsonObj);
break;
}
} else {
@SuppressWarnings("unchecked")
Iterator<String> keys = jsonObj.keys();
while (keys.hasNext()) {
if (jsonObj.get(keys.next()).toString().toLowerCase().contains(args[1].toLowerCase())) {
jsonArray.put(jsonObj);
break;
}
}
}
} else {
jsonArray.put(jsonObj);
}
}
printJson(jsonArray, "apps");
if (consolePresent) {
System.out.println(runningCnt + " active, total " + totalCnt + " applications.");
}
} catch (Exception ex) {
throw new CliException("Failed to retrieve application list", ex);
}
}
}
private class KillAppCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
if (args.length == 1) {
if (currentApp == null) {
throw new CliException("No application selected");
} else {
try {
yarnClient.killApplication(currentApp.getApplicationId());
currentApp = null;
} catch (YarnException e) {
throw new CliException("Failed to kill " + currentApp.getApplicationId(), e);
}
}
if (consolePresent) {
System.out.println("Kill app requested");
}
return;
}
ApplicationReport app = null;
int i = 0;
try {
while (++i < args.length) {
app = getApplication(args[i]);
if (app == null) {
/*
* try once again with application name type.
*/
app = getApplicationByName(args[i]);
if (app == null) {
throw new CliException("Streaming application with id or name " + args[i] + " is not found.");
}
}
yarnClient.killApplication(app.getApplicationId());
if (app == currentApp) {
currentApp = null;
}
}
if (consolePresent) {
System.out.println("Kill app requested");
}
} catch (YarnException e) {
throw new CliException("Failed to kill " + ((app == null || app.getApplicationId() == null) ? "unknown application" : app.getApplicationId()) + ". Aborting killing of any additional applications.", e);
} catch (NumberFormatException nfe) {
throw new CliException("Invalid application Id " + args[i], nfe);
}
}
}
private class AliasCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
if (args[1].equals(args[2])) {
throw new CliException("Alias to itself!");
}
aliases.put(args[1], args[2]);
if (consolePresent) {
System.out.println("Alias " + args[1] + " created.");
}
updateCompleter(reader);
}
}
private class SourceCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
processSourceFile(args[1], reader);
if (consolePresent) {
System.out.println("File " + args[1] + " sourced.");
}
}
}
private class ExitCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
if (topLevelHistory != null) {
try {
topLevelHistory.flush();
} catch (IOException ex) {
LOG.warn("Cannot flush command history");
}
}
if (changingLogicalPlanHistory != null) {
try {
changingLogicalPlanHistory.flush();
} catch (IOException ex) {
LOG.warn("Cannot flush command history");
}
}
System.exit(0);
}
}
private class ListContainersCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
JSONObject json = getResource(StramWebServices.PATH_PHYSICAL_PLAN_CONTAINERS, currentApp);
if (args.length == 1) {
printJson(json);
} else {
Object containersObj = json.get("containers");
JSONArray containers;
if (containersObj instanceof JSONArray) {
containers = (JSONArray)containersObj;
} else {
containers = new JSONArray();
containers.put(containersObj);
}
if (containersObj == null) {
System.out.println("No containers found!");
} else {
JSONArray resultContainers = new JSONArray();
for (int o = containers.length(); o-- > 0; ) {
JSONObject container = containers.getJSONObject(o);
String id = container.getString("id");
if (id != null && !id.isEmpty()) {
for (int argc = args.length; argc-- > 1; ) {
String s1 = "0" + args[argc];
String s2 = "_" + args[argc];
if (id.equals(args[argc]) || id.endsWith(s1) || id.endsWith(s2)) {
resultContainers.put(container);
}
}
}
}
printJson(resultContainers, "containers");
}
}
}
}
private class ListOperatorsCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
JSONObject json = getResource(StramWebServices.PATH_PHYSICAL_PLAN_OPERATORS, currentApp);
if (args.length > 1) {
String singleKey = "" + json.keys().next();
JSONArray matches = new JSONArray();
// filter operators
JSONArray arr;
Object obj = json.get(singleKey);
if (obj instanceof JSONArray) {
arr = (JSONArray)obj;
} else {
arr = new JSONArray();
arr.put(obj);
}
for (int i = 0; i < arr.length(); i++) {
JSONObject oper = arr.getJSONObject(i);
if (StringUtils.isNumeric(args[1])) {
if (oper.getString("id").equals(args[1])) {
matches.put(oper);
break;
}
} else {
@SuppressWarnings("unchecked")
Iterator<String> keys = oper.keys();
while (keys.hasNext()) {
if (oper.get(keys.next()).toString().toLowerCase().contains(args[1].toLowerCase())) {
matches.put(oper);
break;
}
}
}
}
json.put(singleKey, matches);
}
printJson(json);
}
}
private class ShowPhysicalPlanCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
try {
printJson(getResource(StramWebServices.PATH_PHYSICAL_PLAN, currentApp));
} catch (Exception e) {
throw new CliException("Failed web service request for appid " + currentApp.getApplicationId().toString(), e);
}
}
}
private class KillContainerCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
for (int i = 1; i < args.length; i++) {
String containerLongId = getContainerLongId(args[i]);
if (containerLongId == null) {
throw new CliException("Container " + args[i] + " not found");
}
try {
StramAgent.StramUriSpec uriSpec = new StramAgent.StramUriSpec();
uriSpec = uriSpec.path(StramWebServices.PATH_PHYSICAL_PLAN_CONTAINERS).path(URLEncoder.encode(containerLongId, "UTF-8")).path("kill");
JSONObject response = getResource(uriSpec, currentApp, new WebServicesClient.WebServicesHandler<JSONObject>()
{
@Override
public JSONObject process(WebResource.Builder webResource, Class<JSONObject> clazz)
{
return webResource.accept(MediaType.APPLICATION_JSON).post(clazz, new JSONObject());
}
});
if (consolePresent) {
System.out.println("Kill container requested: " + response);
}
} catch (Exception e) {
throw new CliException("Failed web service request for appid " + currentApp.getApplicationId().toString(), e);
}
}
}
}
private class WaitCommand implements Command
{
@Override
public void execute(String[] args, final ConsoleReader reader) throws Exception
{
if (currentApp == null) {
throw new CliException("No application selected");
}
int timeout = Integer.valueOf(args[1]);
ClientRMHelper.AppStatusCallback cb = new ClientRMHelper.AppStatusCallback()
{
@Override
public boolean exitLoop(ApplicationReport report)
{
System.out.println("current status is: " + report.getYarnApplicationState());
try {
if (reader.getInput().available() > 0) {
return true;
}
} catch (IOException e) {
LOG.error("Error checking for input.", e);
}
return false;
}
};
try {
ClientRMHelper clientRMHelper = new ClientRMHelper(yarnClient, conf);
boolean result = clientRMHelper.waitForCompletion(currentApp.getApplicationId(), cb, timeout * 1000);
if (!result) {
System.err.println("Application terminated unsuccessfully.");
}
} catch (YarnException e) {
throw new CliException("Failed to kill " + currentApp.getApplicationId(), e);
}
}
}
private class StartRecordingCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
String opId = args[1];
String port = null;
long numWindows = 0;
if (args.length >= 3) {
port = args[2];
}
if (args.length >= 4) {
numWindows = Long.valueOf(args[3]);
}
printJson(recordingsAgent.startRecording(currentApp.getApplicationId().toString(), opId, port, numWindows));
}
}
private class StopRecordingCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
String opId = args[1];
String port = null;
if (args.length == 3) {
port = args[2];
}
printJson(recordingsAgent.stopRecording(currentApp.getApplicationId().toString(), opId, port));
}
}
private class GetRecordingInfoCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
if (args.length <= 1) {
List<RecordingInfo> recordingInfo = recordingsAgent.getRecordingInfo(currentApp.getApplicationId().toString());
printJson(recordingInfo, "recordings");
} else if (args.length <= 2) {
String opId = args[1];
List<RecordingInfo> recordingInfo = recordingsAgent.getRecordingInfo(currentApp.getApplicationId().toString(), opId);
printJson(recordingInfo, "recordings");
} else {
String opId = args[1];
String id = args[2];
RecordingInfo recordingInfo = recordingsAgent.getRecordingInfo(currentApp.getApplicationId().toString(), opId, id);
printJson(new JSONObject(mapper.writeValueAsString(recordingInfo)));
}
}
}
private class GetAppAttributesCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
if (currentApp == null) {
throw new CliException("No application selected");
}
StramAgent.StramUriSpec uriSpec = new StramAgent.StramUriSpec();
uriSpec = uriSpec.path(StramWebServices.PATH_LOGICAL_PLAN).path("attributes");
if (args.length > 1) {
uriSpec = uriSpec.queryParam("attributeName", args[1]);
}
try {
JSONObject response = getResource(uriSpec, currentApp);
printJson(response);
} catch (Exception e) {
throw new CliException("Failed web service request for appid " + currentApp.getApplicationId().toString(), e);
}
}
}
private class GetOperatorAttributesCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
if (currentApp == null) {
throw new CliException("No application selected");
}
StramAgent.StramUriSpec uriSpec = new StramAgent.StramUriSpec();
uriSpec = uriSpec.path(StramWebServices.PATH_LOGICAL_PLAN_OPERATORS).path(URLEncoder.encode(args[1], "UTF-8")).path("attributes");
if (args.length > 2) {
uriSpec = uriSpec.queryParam("attributeName", args[2]);
}
try {
JSONObject response = getResource(uriSpec, currentApp);
printJson(response);
} catch (Exception e) {
throw new CliException("Failed web service request for appid " + currentApp.getApplicationId().toString(), e);
}
}
}
private class GetPortAttributesCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
if (currentApp == null) {
throw new CliException("No application selected");
}
StramAgent.StramUriSpec uriSpec = new StramAgent.StramUriSpec();
uriSpec = uriSpec.path(StramWebServices.PATH_LOGICAL_PLAN_OPERATORS).path(URLEncoder.encode(args[1], "UTF-8")).path("ports").path(URLEncoder.encode(args[2], "UTF-8")).path("attributes");
if (args.length > 3) {
uriSpec = uriSpec.queryParam("attributeName", args[3]);
}
try {
JSONObject response = getResource(uriSpec, currentApp);
printJson(response);
} catch (Exception e) {
throw new CliException("Failed web service request for appid " + currentApp.getApplicationId().toString(), e);
}
}
}
private class GetOperatorPropertiesCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
if (currentApp == null) {
throw new CliException("No application selected");
}
StramAgent.StramUriSpec uriSpec = new StramAgent.StramUriSpec();
uriSpec = uriSpec.path(StramWebServices.PATH_LOGICAL_PLAN_OPERATORS).path(URLEncoder.encode(args[1], "UTF-8")).path("properties");
if (args.length > 2) {
uriSpec = uriSpec.queryParam("propertyName", args[2]);
}
try {
JSONObject response = getResource(uriSpec, currentApp);
printJson(response);
} catch (Exception e) {
throw new CliException("Failed web service request for appid " + currentApp.getApplicationId().toString(), e);
}
}
}
private class GetPhysicalOperatorPropertiesCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
if (currentApp == null) {
throw new CliException("No application selected");
}
if (!NumberUtils.isDigits(args[1])) {
throw new CliException("Operator ID must be a number");
}
String[] newArgs = new String[args.length - 1];
System.arraycopy(args, 1, newArgs, 0, args.length - 1);
PosixParser parser = new PosixParser();
CommandLine line = parser.parse(GET_PHYSICAL_PROPERTY_OPTIONS.options, newArgs);
String waitTime = line.getOptionValue(GET_PHYSICAL_PROPERTY_OPTIONS.waitTime.getOpt());
String propertyName = line.getOptionValue(GET_PHYSICAL_PROPERTY_OPTIONS.propertyName.getOpt());
StramAgent.StramUriSpec uriSpec = new StramAgent.StramUriSpec();
uriSpec = uriSpec.path(StramWebServices.PATH_PHYSICAL_PLAN_OPERATORS).path(args[1]).path("properties");
if (propertyName != null) {
uriSpec = uriSpec.queryParam("propertyName", propertyName);
}
if (waitTime != null) {
uriSpec = uriSpec.queryParam("waitTime", waitTime);
}
try {
JSONObject response = getResource(uriSpec, currentApp);
printJson(response);
} catch (Exception e) {
throw new CliException("Failed web service request for appid " + currentApp.getApplicationId().toString(), e);
}
}
}
private class SetOperatorPropertyCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
if (currentApp == null) {
throw new CliException("No application selected");
}
if (changingLogicalPlan) {
String operatorName = args[1];
String propertyName = args[2];
String propertyValue = args[3];
SetOperatorPropertyRequest request = new SetOperatorPropertyRequest();
request.setOperatorName(operatorName);
request.setPropertyName(propertyName);
request.setPropertyValue(propertyValue);
logicalPlanRequestQueue.add(request);
} else {
StramAgent.StramUriSpec uriSpec = new StramAgent.StramUriSpec();
uriSpec = uriSpec.path(StramWebServices.PATH_LOGICAL_PLAN_OPERATORS).path(URLEncoder.encode(args[1], "UTF-8")).path("properties");
final JSONObject request = new JSONObject();
request.put(args[2], args[3]);
JSONObject response = getResource(uriSpec, currentApp, new WebServicesClient.WebServicesHandler<JSONObject>()
{
@Override
public JSONObject process(WebResource.Builder webResource, Class<JSONObject> clazz)
{
return webResource.accept(MediaType.APPLICATION_JSON).post(JSONObject.class, request);
}
});
printJson(response);
}
}
}
private class SetPhysicalOperatorPropertyCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
if (currentApp == null) {
throw new CliException("No application selected");
}
if (!NumberUtils.isDigits(args[1])) {
throw new CliException("Operator ID must be a number");
}
StramAgent.StramUriSpec uriSpec = new StramAgent.StramUriSpec();
uriSpec = uriSpec.path(StramWebServices.PATH_PHYSICAL_PLAN_OPERATORS).path(args[1]).path("properties");
final JSONObject request = new JSONObject();
request.put(args[2], args[3]);
JSONObject response = getResource(uriSpec, currentApp, new WebServicesClient.WebServicesHandler<JSONObject>()
{
@Override
public JSONObject process(WebResource.Builder webResource, Class<JSONObject> clazz)
{
return webResource.accept(MediaType.APPLICATION_JSON).post(JSONObject.class, request);
}
});
printJson(response);
}
}
private class BeginLogicalPlanChangeCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
changingLogicalPlan = true;
reader.setHistory(changingLogicalPlanHistory);
}
}
private class ShowLogicalPlanCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
String[] newArgs = new String[args.length - 1];
System.arraycopy(args, 1, newArgs, 0, args.length - 1);
ShowLogicalPlanCommandLineInfo commandLineInfo = getShowLogicalPlanCommandLineInfo(newArgs);
Configuration config = StramClientUtils.addDTSiteResources(new Configuration());
if (commandLineInfo.libjars != null) {
commandLineInfo.libjars = expandCommaSeparatedFiles(commandLineInfo.libjars);
if (commandLineInfo.libjars != null) {
config.set(StramAppLauncher.LIBJARS_CONF_KEY_NAME, commandLineInfo.libjars);
}
}
if (commandLineInfo.args.length > 0) {
// see if the first argument is actually an app package
try (AppPackage ap = newAppPackageInstance(new URI(commandLineInfo.args[0]), false)) {
new ShowLogicalPlanAppPackageCommand().execute(args, reader);
return;
} catch (Exception ex) {
// fall through
}
String filename = expandFileName(commandLineInfo.args[0], true);
if (commandLineInfo.args.length >= 2) {
String appName = commandLineInfo.args[1];
StramAppLauncher submitApp = getStramAppLauncher(filename, config, commandLineInfo.ignorePom);
submitApp.loadDependencies();
List<AppFactory> matchingAppFactories = getMatchingAppFactories(submitApp, appName, commandLineInfo.exactMatch);
if (matchingAppFactories == null || matchingAppFactories.isEmpty()) {
throw new CliException("No application in jar file matches '" + appName + "'");
} else if (matchingAppFactories.size() > 1) {
throw new CliException("More than one application in jar file match '" + appName + "'");
} else {
Map<String, Object> map = new HashMap<>();
PrintStream originalStream = System.out;
AppFactory appFactory = matchingAppFactories.get(0);
try {
if (raw) {
PrintStream dummyStream = new PrintStream(new OutputStream()
{
@Override
public void write(int b)
{
// no-op
}
});
System.setOut(dummyStream);
}
LogicalPlan logicalPlan = appFactory.createApp(submitApp.getLogicalPlanConfiguration());
map.put("applicationName", appFactory.getName());
map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan, false));
} finally {
if (raw) {
System.setOut(originalStream);
}
}
printJson(map);
}
} else {
if (filename.endsWith(".json")) {
File file = new File(filename);
StramAppLauncher submitApp = new StramAppLauncher(file.getName(), config);
AppFactory appFactory = new StramAppLauncher.JsonFileAppFactory(file);
LogicalPlan logicalPlan = appFactory.createApp(submitApp.getLogicalPlanConfiguration());
Map<String, Object> map = new HashMap<>();
map.put("applicationName", appFactory.getName());
map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan, false));
printJson(map);
} else if (filename.endsWith(".properties")) {
File file = new File(filename);
StramAppLauncher submitApp = new StramAppLauncher(file.getName(), config);
AppFactory appFactory = new StramAppLauncher.PropertyFileAppFactory(file);
LogicalPlan logicalPlan = appFactory.createApp(submitApp.getLogicalPlanConfiguration());
Map<String, Object> map = new HashMap<>();
map.put("applicationName", appFactory.getName());
map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan, false));
printJson(map);
} else {
StramAppLauncher submitApp = getStramAppLauncher(filename, config, commandLineInfo.ignorePom);
submitApp.loadDependencies();
List<Map<String, Object>> appList = new ArrayList<>();
List<AppFactory> appFactoryList = submitApp.getBundledTopologies();
for (AppFactory appFactory : appFactoryList) {
Map<String, Object> m = new HashMap<>();
m.put("name", appFactory.getName());
appList.add(m);
}
printJson(appList, "applications");
}
}
} else {
if (currentApp == null) {
throw new CliException("No application selected");
}
JSONObject response = getResource(StramWebServices.PATH_LOGICAL_PLAN, currentApp);
printJson(response);
}
}
}
private class ShowLogicalPlanAppPackageCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
try (AppPackage ap = newAppPackageInstance(new URI(args[1]), true)) {
List<AppInfo> applications = ap.getApplications();
if (args.length >= 3) {
for (AppInfo appInfo : applications) {
if (args[2].equals(appInfo.name)) {
Map<String, Object> map = new HashMap<>();
map.put("applicationName", appInfo.name);
if (appInfo.dag != null) {
map.put("logicalPlan", LogicalPlanSerializer.convertToMap(appInfo.dag, false));
}
if (appInfo.error != null) {
map.put("error", appInfo.error);
}
printJson(map);
}
}
} else {
List<Map<String, Object>> appList = new ArrayList<>();
for (AppInfo appInfo : applications) {
Map<String, Object> m = new HashMap<>();
m.put("name", appInfo.name);
m.put("type", appInfo.type);
appList.add(m);
}
printJson(appList, "applications");
}
}
}
}
private File copyToLocal(String[] files) throws IOException
{
File tmpDir = new File(System.getProperty("java.io.tmpdir") + "/datatorrent/" + ManagementFactory.getRuntimeMXBean().getName());
tmpDir.mkdirs();
for (int i = 0; i < files.length; i++) {
try {
URI uri = new URI(files[i]);
String scheme = uri.getScheme();
if (scheme == null || scheme.equals("file")) {
files[i] = uri.getPath();
} else {
try (FileSystem tmpFs = FileSystem.newInstance(uri, conf)) {
Path srcPath = new Path(uri.getPath());
Path dstPath = new Path(tmpDir.getAbsolutePath(), String.valueOf(i) + srcPath.getName());
tmpFs.copyToLocalFile(srcPath, dstPath);
files[i] = dstPath.toUri().getPath();
}
}
} catch (URISyntaxException ex) {
throw new RuntimeException(ex);
}
}
return tmpDir;
}
private static Options GET_APP_PACKAGE_INFO_OPTIONS = new Options();
static {
GET_APP_PACKAGE_INFO_OPTIONS
.addOption(new Option("withDescription", false, "Get default properties with description"));
}
public static class GetOperatorClassesCommandLineOptions
{
final Options options = new Options();
final Option parent = add(new Option("parent", true, "Specify the parent class for the operators"));
private Option add(Option opt)
{
this.options.addOption(opt);
return opt;
}
}
private static GetOperatorClassesCommandLineOptions GET_OPERATOR_CLASSES_OPTIONS = new GetOperatorClassesCommandLineOptions();
static class GetAppPackageInfoCommandLineInfo
{
boolean provideDescription;
}
static GetAppPackageInfoCommandLineInfo getGetAppPackageInfoCommandLineInfo(String[] args) throws ParseException
{
CommandLineParser parser = new PosixParser();
GetAppPackageInfoCommandLineInfo result = new GetAppPackageInfoCommandLineInfo();
CommandLine line = parser.parse(GET_APP_PACKAGE_INFO_OPTIONS, args);
result.provideDescription = line.hasOption("withDescription");
return result;
}
static class GetOperatorClassesCommandLineInfo
{
String parent;
String[] args;
}
static GetOperatorClassesCommandLineInfo getGetOperatorClassesCommandLineInfo(String[] args) throws ParseException
{
CommandLineParser parser = new PosixParser();
GetOperatorClassesCommandLineInfo result = new GetOperatorClassesCommandLineInfo();
CommandLine line = parser.parse(GET_OPERATOR_CLASSES_OPTIONS.options, args);
result.parent = line.getOptionValue("parent");
result.args = line.getArgs();
return result;
}
private class GetJarOperatorClassesCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
String[] newArgs = new String[args.length - 1];
System.arraycopy(args, 1, newArgs, 0, args.length - 1);
GetOperatorClassesCommandLineInfo commandLineInfo = getGetOperatorClassesCommandLineInfo(newArgs);
String parentName = commandLineInfo.parent != null ? commandLineInfo.parent : GenericOperator.class.getName();
String files = expandCommaSeparatedFiles(commandLineInfo.args[0]);
if (files == null) {
throw new CliException("File " + commandLineInfo.args[0] + " is not found");
}
String[] jarFiles = files.split(",");
File tmpDir = copyToLocal(jarFiles);
try {
OperatorDiscoverer operatorDiscoverer = new OperatorDiscoverer(jarFiles);
String searchTerm = commandLineInfo.args.length > 1 ? commandLineInfo.args[1] : null;
Set<String> operatorClasses = operatorDiscoverer.getOperatorClasses(parentName, searchTerm);
JSONObject json = new JSONObject();
JSONArray arr = new JSONArray();
JSONObject portClassHier = new JSONObject();
JSONObject portTypesWithSchemaClasses = new JSONObject();
JSONObject failed = new JSONObject();
for (final String clazz : operatorClasses) {
try {
JSONObject oper = operatorDiscoverer.describeOperator(clazz);
// add default value
operatorDiscoverer.addDefaultValue(clazz, oper);
// add class hierarchy info to portClassHier and fetch port types with schema classes
operatorDiscoverer.buildAdditionalPortInfo(oper, portClassHier, portTypesWithSchemaClasses);
Iterator portTypesIter = portTypesWithSchemaClasses.keys();
while (portTypesIter.hasNext()) {
if (!portTypesWithSchemaClasses.getBoolean((String)portTypesIter.next())) {
portTypesIter.remove();
}
}
arr.put(oper);
} catch (Exception | NoClassDefFoundError ex) {
// ignore this class
final String cls = clazz;
failed.put(cls, ex.toString());
}
}
json.put("operatorClasses", arr);
json.put("portClassHier", portClassHier);
json.put("portTypesWithSchemaClasses", portTypesWithSchemaClasses);
if (failed.length() > 0) {
json.put("failedOperators", failed);
}
printJson(json);
} finally {
FileUtils.deleteDirectory(tmpDir);
}
}
}
private class GetJarOperatorPropertiesCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
String files = expandCommaSeparatedFiles(args[1]);
if (files == null) {
throw new CliException("File " + args[1] + " is not found");
}
String[] jarFiles = files.split(",");
File tmpDir = copyToLocal(jarFiles);
try {
OperatorDiscoverer operatorDiscoverer = new OperatorDiscoverer(jarFiles);
Class<? extends Operator> operatorClass = operatorDiscoverer.getOperatorClass(args[2]);
printJson(operatorDiscoverer.describeOperator(operatorClass.getName()));
} finally {
FileUtils.deleteDirectory(tmpDir);
}
}
}
private class DumpPropertiesFileCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
String outfilename = expandFileName(args[1], false);
if (args.length > 3) {
String jarfile = args[2];
String appName = args[3];
Configuration config = StramClientUtils.addDTSiteResources(new Configuration());
StramAppLauncher submitApp = getStramAppLauncher(jarfile, config, false);
submitApp.loadDependencies();
List<AppFactory> matchingAppFactories = getMatchingAppFactories(submitApp, appName, true);
if (matchingAppFactories == null || matchingAppFactories.isEmpty()) {
throw new CliException("No application in jar file matches '" + appName + "'");
} else if (matchingAppFactories.size() > 1) {
throw new CliException("More than one application in jar file match '" + appName + "'");
} else {
AppFactory appFactory = matchingAppFactories.get(0);
LogicalPlan logicalPlan = appFactory.createApp(submitApp.getLogicalPlanConfiguration());
File file = new File(outfilename);
if (!file.exists()) {
file.createNewFile();
}
LogicalPlanSerializer.convertToProperties(logicalPlan).save(file);
}
} else {
if (currentApp == null) {
throw new CliException("No application selected");
}
JSONObject response = getResource(StramWebServices.PATH_LOGICAL_PLAN, currentApp);
File file = new File(outfilename);
if (!file.exists()) {
file.createNewFile();
}
LogicalPlanSerializer.convertToProperties(response).save(file);
}
System.out.println("Property file is saved at " + outfilename);
}
}
private class CreateOperatorCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
String operatorName = args[1];
String className = args[2];
CreateOperatorRequest request = new CreateOperatorRequest();
request.setOperatorName(operatorName);
request.setOperatorFQCN(className);
logicalPlanRequestQueue.add(request);
}
}
private class RemoveOperatorCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
String operatorName = args[1];
RemoveOperatorRequest request = new RemoveOperatorRequest();
request.setOperatorName(operatorName);
logicalPlanRequestQueue.add(request);
}
}
private class CreateStreamCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
String streamName = args[1];
String sourceOperatorName = args[2];
String sourcePortName = args[3];
String sinkOperatorName = args[4];
String sinkPortName = args[5];
CreateStreamRequest request = new CreateStreamRequest();
request.setStreamName(streamName);
request.setSourceOperatorName(sourceOperatorName);
request.setSinkOperatorName(sinkOperatorName);
request.setSourceOperatorPortName(sourcePortName);
request.setSinkOperatorPortName(sinkPortName);
logicalPlanRequestQueue.add(request);
}
}
private class AddStreamSinkCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
String streamName = args[1];
String sinkOperatorName = args[2];
String sinkPortName = args[3];
AddStreamSinkRequest request = new AddStreamSinkRequest();
request.setStreamName(streamName);
request.setSinkOperatorName(sinkOperatorName);
request.setSinkOperatorPortName(sinkPortName);
logicalPlanRequestQueue.add(request);
}
}
private class RemoveStreamCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
String streamName = args[1];
RemoveStreamRequest request = new RemoveStreamRequest();
request.setStreamName(streamName);
logicalPlanRequestQueue.add(request);
}
}
private class SetOperatorAttributeCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
String operatorName = args[1];
String attributeName = args[2];
String attributeValue = args[3];
SetOperatorAttributeRequest request = new SetOperatorAttributeRequest();
request.setOperatorName(operatorName);
request.setAttributeName(attributeName);
request.setAttributeValue(attributeValue);
logicalPlanRequestQueue.add(request);
}
}
private class SetStreamAttributeCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
String streamName = args[1];
String attributeName = args[2];
String attributeValue = args[3];
SetStreamAttributeRequest request = new SetStreamAttributeRequest();
request.setStreamName(streamName);
request.setAttributeName(attributeName);
request.setAttributeValue(attributeValue);
logicalPlanRequestQueue.add(request);
}
}
private class SetPortAttributeCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
String operatorName = args[1];
String attributeName = args[2];
String attributeValue = args[3];
SetPortAttributeRequest request = new SetPortAttributeRequest();
request.setOperatorName(operatorName);
request.setAttributeName(attributeName);
request.setAttributeValue(attributeValue);
logicalPlanRequestQueue.add(request);
}
}
private class AbortCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
logicalPlanRequestQueue.clear();
changingLogicalPlan = false;
reader.setHistory(topLevelHistory);
}
}
private class SubmitCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
if (logicalPlanRequestQueue.isEmpty()) {
throw new CliException("Nothing to submit. Type \"abort\" to abort change");
}
StramAgent.StramUriSpec uriSpec = new StramAgent.StramUriSpec();
uriSpec = uriSpec.path(StramWebServices.PATH_LOGICAL_PLAN);
try {
final Map<String, Object> m = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
m.put("requests", logicalPlanRequestQueue);
final JSONObject jsonRequest = new JSONObject(mapper.writeValueAsString(m));
JSONObject response = getResource(uriSpec, currentApp, new WebServicesClient.WebServicesHandler<JSONObject>()
{
@Override
public JSONObject process(WebResource.Builder webResource, Class<JSONObject> clazz)
{
return webResource.accept(MediaType.APPLICATION_JSON).post(JSONObject.class, jsonRequest);
}
});
printJson(response);
} catch (Exception e) {
throw new CliException("Failed web service request for appid " + currentApp.getApplicationId().toString(), e);
}
logicalPlanRequestQueue.clear();
changingLogicalPlan = false;
reader.setHistory(topLevelHistory);
}
}
private class ShowQueueCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
printJson(logicalPlanRequestQueue, "queue");
if (consolePresent) {
System.out.println("Total operations in queue: " + logicalPlanRequestQueue.size());
}
}
}
private class BeginMacroCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
String name = args[1];
if (macros.containsKey(name) || aliases.containsKey(name)) {
System.err.println("Name '" + name + "' already exists.");
return;
}
try {
List<String> commands = new ArrayList<>();
while (true) {
String line;
if (consolePresent) {
line = reader.readLine("macro def (" + name + ") > ");
} else {
line = reader.readLine("", (char)0);
}
if (line.equals("end")) {
macros.put(name, commands);
updateCompleter(reader);
if (consolePresent) {
System.out.println("Macro '" + name + "' created.");
}
return;
} else if (line.equals("abort")) {
System.err.println("Aborted");
return;
} else {
commands.add(line);
}
}
} catch (IOException ex) {
System.err.println("Aborted");
}
}
}
private class SetPagerCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
if (args[1].equals("off")) {
pagerCommand = null;
} else if (args[1].equals("on")) {
if (consolePresent) {
pagerCommand = "less -F -X -r";
}
} else {
throw new CliException("set-pager parameter is either on or off.");
}
}
}
private class GetAppInfoCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
ApplicationReport appReport;
if (args.length > 1) {
appReport = getApplication(args[1]);
if (appReport == null) {
throw new CliException("Streaming application with id " + args[1] + " is not found.");
}
} else {
if (currentApp == null) {
throw new CliException("No application selected");
}
// refresh the state in currentApp
currentApp = yarnClient.getApplicationReport(currentApp.getApplicationId());
appReport = currentApp;
}
JSONObject response;
try {
response = getResource(StramWebServices.PATH_INFO, currentApp);
} catch (Exception ex) {
response = new JSONObject();
response.put("startTime", appReport.getStartTime());
response.put("id", appReport.getApplicationId().toString());
response.put("name", appReport.getName());
response.put("user", appReport.getUser());
}
response.put("state", appReport.getYarnApplicationState().name());
response.put("trackingUrl", appReport.getTrackingUrl());
response.put("finalStatus", appReport.getFinalApplicationStatus());
JSONArray tags = new JSONArray();
for (String tag : appReport.getApplicationTags()) {
tags.put(tag);
}
response.put("tags", tags);
printJson(response);
}
}
private class GetContainerStackTrace implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
String containerLongId = getContainerLongId(args[1]);
if (containerLongId == null) {
throw new CliException("Container " + args[1] + " not found");
}
JSONObject response;
try {
response = getResource(StramWebServices.PATH_PHYSICAL_PLAN_CONTAINERS + "/" + args[1] + "/" + StramWebServices.PATH_STACKTRACE, currentApp);
} catch (Exception ex) {
throw new CliException("Webservice call to AppMaster failed.", ex);
}
printJson(response);
}
}
private class GetAppPackageInfoCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
String[] tmpArgs = new String[args.length - 2];
System.arraycopy(args, 2, tmpArgs, 0, args.length - 2);
GetAppPackageInfoCommandLineInfo commandLineInfo = getGetAppPackageInfoCommandLineInfo(tmpArgs);
try (AppPackage ap = newAppPackageInstance(new URI(args[1]), true)) {
JSONSerializationProvider jomp = new JSONSerializationProvider();
jomp.addSerializer(PropertyInfo.class,
new AppPackage.PropertyInfoSerializer(commandLineInfo.provideDescription));
JSONObject apInfo = new JSONObject(jomp.getContext(null).writeValueAsString(ap));
apInfo.remove("name");
printJson(apInfo);
}
}
}
private void checkConfigPackageCompatible(AppPackage ap, ConfigPackage cp)
{
if (cp == null) {
return;
}
String requiredAppPackageName = cp.getAppPackageName();
String requiredAppPackageGroupId = cp.getAppPackageGroupId();
if (requiredAppPackageName != null && !requiredAppPackageName.equals(ap.getAppPackageName())) {
throw new CliException("Config package requires an app package name of \"" + requiredAppPackageName + "\". The app package given has the name of \"" + ap.getAppPackageName() + "\"");
}
if (requiredAppPackageGroupId != null && !requiredAppPackageGroupId.equals(ap.getAppPackageGroupId())) {
throw new CliException("Config package requires an app package group id of \"" + requiredAppPackageGroupId +
"\". The app package given has the groupId of \"" + ap.getAppPackageGroupId() + "\"");
}
String requiredAppPackageMinVersion = cp.getAppPackageMinVersion();
if (requiredAppPackageMinVersion != null && VersionInfo.compare(requiredAppPackageMinVersion, ap.getAppPackageVersion()) > 0) {
throw new CliException("Config package requires an app package minimum version of \"" + requiredAppPackageMinVersion + "\". The app package given is of version \"" + ap.getAppPackageVersion() + "\"");
}
String requiredAppPackageMaxVersion = cp.getAppPackageMaxVersion();
if (requiredAppPackageMaxVersion != null && VersionInfo.compare(requiredAppPackageMaxVersion, ap.getAppPackageVersion()) < 0) {
throw new CliException("Config package requires an app package maximum version of \"" + requiredAppPackageMaxVersion + "\". The app package given is of version \"" + ap.getAppPackageVersion() + "\"");
}
}
private void checkPlatformCompatible(AppPackage ap)
{
String apVersion = ap.getDtEngineVersion();
VersionInfo actualVersion = VersionInfo.APEX_VERSION;
if (!VersionInfo.isCompatible(actualVersion.getVersion(), apVersion)) {
throw new CliException("This App Package is compiled with Apache Apex Core API version " + apVersion + ", which is incompatible with this Apex Core version " + actualVersion.getVersion());
}
}
private void launchAppPackage(AppPackage ap, ConfigPackage cp, LaunchCommandLineInfo commandLineInfo, ConsoleReader reader) throws Exception
{
new LaunchCommand().execute(getLaunchAppPackageArgs(ap, cp, commandLineInfo, reader), reader);
}
String[] getLaunchAppPackageArgs(AppPackage ap, ConfigPackage cp, LaunchCommandLineInfo commandLineInfo, ConsoleReader reader) throws Exception
{
String matchAppName = null;
if (commandLineInfo.args.length > 1) {
matchAppName = commandLineInfo.args[1];
}
List<AppInfo> applications = new ArrayList<>(getAppsFromPackageAndConfig(ap, cp, commandLineInfo.useConfigApps));
if (matchAppName != null) {
Iterator<AppInfo> it = applications.iterator();
while (it.hasNext()) {
AppInfo ai = it.next();
if ((commandLineInfo.exactMatch && !ai.name.equals(matchAppName))
|| !ai.name.toLowerCase().matches(".*" + matchAppName.toLowerCase() + ".*")) {
it.remove();
}
}
}
AppInfo selectedApp = null;
if (applications.isEmpty()) {
throw new CliException("No applications in Application Package" + (matchAppName != null ? " matching \"" + matchAppName + "\"" : ""));
} else if (applications.size() == 1) {
selectedApp = applications.get(0);
} else {
//Store the appNames sorted in alphabetical order and their position in matchingAppFactories list
TreeMap<String, Integer> appNamesInAlphabeticalOrder = new TreeMap<>();
// Display matching applications
for (int i = 0; i < applications.size(); i++) {
String appName = applications.get(i).name;
appNamesInAlphabeticalOrder.put(appName, i);
}
//Create a mapping between the app display number and original index at matchingAppFactories
int index = 1;
HashMap<Integer, Integer> displayIndexToOriginalUnsortedIndexMap = new HashMap<>();
for (Map.Entry<String, Integer> entry : appNamesInAlphabeticalOrder.entrySet()) {
//Map display number of the app to original unsorted index
displayIndexToOriginalUnsortedIndexMap.put(index, entry.getValue());
//Display the app names
System.out.printf("%3d. %s\n", index++, entry.getKey());
}
// Exit if not in interactive mode
if (!consolePresent) {
throw new CliException("More than one application in Application Package match '" + matchAppName + "'");
} else {
boolean useHistory = reader.isHistoryEnabled();
reader.setHistoryEnabled(false);
History previousHistory = reader.getHistory();
History dummyHistory = new MemoryHistory();
reader.setHistory(dummyHistory);
List<Completer> completers = new ArrayList<>(reader.getCompleters());
for (Completer c : completers) {
reader.removeCompleter(c);
}
reader.setHandleUserInterrupt(true);
String optionLine;
try {
optionLine = reader.readLine("Choose application: ");
} finally {
reader.setHandleUserInterrupt(false);
reader.setHistoryEnabled(useHistory);
reader.setHistory(previousHistory);
for (Completer c : completers) {
reader.addCompleter(c);
}
}
try {
int option = Integer.parseInt(optionLine);
if (0 < option && option <= applications.size()) {
int appIndex = displayIndexToOriginalUnsortedIndexMap.get(option);
selectedApp = applications.get(appIndex);
}
} catch (Exception ex) {
// ignore
}
}
}
if (selectedApp == null) {
throw new CliException("No application selected");
}
DTConfiguration launchProperties = getLaunchAppPackageProperties(ap, cp, commandLineInfo, selectedApp.name);
String appFile = ap.tempDirectory() + "/app/" + selectedApp.file;
List<String> launchArgs = new ArrayList<>();
launchArgs.add("launch");
launchArgs.add("-exactMatch");
List<String> absClassPath = new ArrayList<>(ap.getClassPath());
for (int i = 0; i < absClassPath.size(); i++) {
String path = absClassPath.get(i);
if (!path.startsWith("/")) {
absClassPath.set(i, ap.tempDirectory() + "/" + path);
}
}
if (cp != null) {
StringBuilder files = new StringBuilder();
for (String file : cp.getClassPath()) {
if (files.length() != 0) {
files.append(',');
}
files.append(cp.tempDirectory()).append(File.separatorChar).append(file);
}
if (!StringUtils.isBlank(files.toString())) {
if (commandLineInfo.libjars != null) {
commandLineInfo.libjars = files.toString() + "," + commandLineInfo.libjars;
} else {
commandLineInfo.libjars = files.toString();
}
}
files.setLength(0);
for (String file : cp.getFiles()) {
if (files.length() != 0) {
files.append(',');
}
files.append(cp.tempDirectory()).append(File.separatorChar).append(file);
}
if (!StringUtils.isBlank(files.toString())) {
if (commandLineInfo.files != null) {
commandLineInfo.files = files.toString() + "," + commandLineInfo.files;
} else {
commandLineInfo.files = files.toString();
}
}
}
StringBuilder libjarsVal = new StringBuilder();
if (!absClassPath.isEmpty() || commandLineInfo.libjars != null) {
if (!absClassPath.isEmpty()) {
libjarsVal.append(org.apache.commons.lang3.StringUtils.join(absClassPath, ','));
}
if (commandLineInfo.libjars != null) {
if (libjarsVal.length() > 0) {
libjarsVal.append(",");
}
libjarsVal.append(commandLineInfo.libjars);
}
}
if (appFile.endsWith(".json") || appFile.endsWith(".properties")) {
if (libjarsVal.length() > 0) {
libjarsVal.append(",");
}
libjarsVal.append(ap.tempDirectory()).append("/app/*.jar");
}
if (libjarsVal.length() > 0) {
launchArgs.add("-libjars");
launchArgs.add(libjarsVal.toString());
}
File launchPropertiesFile = new File(ap.tempDirectory(), "launch.xml");
launchProperties.writeToFile(launchPropertiesFile, "");
launchArgs.add("-conf");
launchArgs.add(launchPropertiesFile.getCanonicalPath());
if (commandLineInfo.localMode) {
launchArgs.add("-local");
}
if (commandLineInfo.archives != null) {
launchArgs.add("-archives");
launchArgs.add(commandLineInfo.archives);
}
if (commandLineInfo.files != null) {
launchArgs.add("-files");
launchArgs.add(commandLineInfo.files);
}
if (commandLineInfo.origAppId != null) {
launchArgs.add("-originalAppId");
launchArgs.add(commandLineInfo.origAppId);
}
if (commandLineInfo.queue != null) {
launchArgs.add("-queue");
launchArgs.add(commandLineInfo.queue);
}
if (commandLineInfo.tags != null) {
launchArgs.add("-tags");
launchArgs.add(commandLineInfo.tags);
}
launchArgs.add(appFile);
if (!appFile.endsWith(".json") && !appFile.endsWith(".properties")) {
launchArgs.add(selectedApp.name);
}
LOG.debug("Launch command: {}", StringUtils.join(launchArgs, " "));
return launchArgs.toArray(new String[]{});
}
DTConfiguration getLaunchAppPackageProperties(AppPackage ap, ConfigPackage cp, LaunchCommandLineInfo commandLineInfo, String appName) throws Exception
{
DTConfiguration launchProperties = new DTConfiguration();
List<AppInfo> applications = getAppsFromPackageAndConfig(ap, cp, commandLineInfo.useConfigApps);
AppInfo selectedApp = null;
for (AppInfo app : applications) {
if (app.name.equals(appName)) {
selectedApp = app;
break;
}
}
Map<String, PropertyInfo> defaultProperties = selectedApp == null ? ap.getDefaultProperties() : selectedApp.defaultProperties;
Set<String> requiredProperties = new TreeSet<>(selectedApp == null ? ap.getRequiredProperties() : selectedApp.requiredProperties);
for (Map.Entry<String, PropertyInfo> entry : defaultProperties.entrySet()) {
launchProperties.set(entry.getKey(), entry.getValue().getValue(), Scope.TRANSIENT, entry.getValue().getDescription());
requiredProperties.remove(entry.getKey());
}
// settings specified in the user's environment take precedence over defaults in package.
// since both are merged into a single -conf option below, apply them on top of the defaults here.
File confFile = new File(StramClientUtils.getUserDTDirectory(), StramClientUtils.DT_SITE_XML_FILE);
if (confFile.exists()) {
Configuration userConf = new Configuration(false);
userConf.addResource(new Path(confFile.toURI()));
Iterator<Entry<String, String>> it = userConf.iterator();
while (it.hasNext()) {
Entry<String, String> entry = it.next();
// filter relevant entries
String key = entry.getKey();
if (key.startsWith(StreamingApplication.DT_PREFIX)
|| key.startsWith(StreamingApplication.APEX_PREFIX)) {
launchProperties.set(key, entry.getValue(), Scope.TRANSIENT, null);
requiredProperties.remove(key);
}
}
}
if (commandLineInfo.apConfigFile != null) {
DTConfiguration givenConfig = new DTConfiguration();
givenConfig.loadFile(new File(ap.tempDirectory() + "/conf/" + commandLineInfo.apConfigFile));
for (Map.Entry<String, String> entry : givenConfig) {
launchProperties.set(entry.getKey(), entry.getValue(), Scope.TRANSIENT, null);
requiredProperties.remove(entry.getKey());
}
}
if (cp != null) {
Map<String, String> properties = cp.getProperties(appName);
for (Map.Entry<String, String> entry : properties.entrySet()) {
launchProperties.set(entry.getKey(), entry.getValue(), Scope.TRANSIENT, null);
requiredProperties.remove(entry.getKey());
}
} else if (commandLineInfo.configFile != null) {
DTConfiguration givenConfig = new DTConfiguration();
givenConfig.loadFile(new File(commandLineInfo.configFile));
for (Map.Entry<String, String> entry : givenConfig) {
launchProperties.set(entry.getKey(), entry.getValue(), Scope.TRANSIENT, null);
requiredProperties.remove(entry.getKey());
}
}
if (commandLineInfo.overrideProperties != null) {
for (Map.Entry<String, String> entry : commandLineInfo.overrideProperties.entrySet()) {
launchProperties.set(entry.getKey(), entry.getValue(), Scope.TRANSIENT, null);
requiredProperties.remove(entry.getKey());
}
}
// now look at whether it is in default configuration
for (Map.Entry<String, String> entry : conf) {
if (StringUtils.isNotBlank(entry.getValue())) {
requiredProperties.remove(entry.getKey());
}
}
if (!requiredProperties.isEmpty()) {
throw new CliException("Required properties not set: " + StringUtils.join(requiredProperties, ", "));
}
//StramClientUtils.evalProperties(launchProperties);
return launchProperties;
}
private List<AppInfo> getAppsFromPackageAndConfig(AppPackage ap, ConfigPackage cp, String configApps)
{
if (cp == null || configApps == null || !(configApps.equals(CONFIG_INCLUSIVE) || configApps.equals(CONFIG_EXCLUSIVE))) {
return ap.getApplications();
}
File src = new File(cp.tempDirectory(), "app");
File dest = new File(ap.tempDirectory(), "app");
if (!src.exists()) {
return ap.getApplications();
}
if (configApps.equals(CONFIG_EXCLUSIVE)) {
for (File file : dest.listFiles()) {
if (file.getName().endsWith(".json")) {
FileUtils.deleteQuietly(new File(dest, file.getName()));
}
}
} else {
for (File file : src.listFiles()) {
FileUtils.deleteQuietly(new File(dest, file.getName()));
}
}
for (File file : src.listFiles()) {
try {
FileUtils.moveFileToDirectory(file, dest, true);
} catch (IOException e) {
LOG.warn("Application from the config file {} failed while processing.", file.getName());
}
}
try {
FileUtils.deleteDirectory(src);
} catch (IOException e) {
LOG.warn("Failed to delete the Config Apps folder");
}
ap.processAppDirectory(configApps.equals(CONFIG_EXCLUSIVE));
return ap.getApplications();
}
private class GetAppPackageOperatorsCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
String[] tmpArgs = new String[args.length - 1];
System.arraycopy(args, 1, tmpArgs, 0, args.length - 1);
GetOperatorClassesCommandLineInfo commandLineInfo = getGetOperatorClassesCommandLineInfo(tmpArgs);
try (AppPackage ap = newAppPackageInstance(new URI(commandLineInfo.args[0]), true)) {
List<String> newArgs = new ArrayList<>();
List<String> jars = new ArrayList<>();
for (String jar : ap.getAppJars()) {
jars.add(ap.tempDirectory() + "/app/" + jar);
}
for (String libJar : ap.getClassPath()) {
jars.add(ap.tempDirectory() + "/" + libJar);
}
newArgs.add("get-jar-operator-classes");
if (commandLineInfo.parent != null) {
newArgs.add("-parent");
newArgs.add(commandLineInfo.parent);
}
newArgs.add(StringUtils.join(jars, ","));
for (int i = 1; i < commandLineInfo.args.length; i++) {
newArgs.add(commandLineInfo.args[i]);
}
LOG.debug("Executing: " + newArgs);
new GetJarOperatorClassesCommand().execute(newArgs.toArray(new String[]{}), reader);
}
}
}
private class GetAppPackageOperatorPropertiesCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
try (AppPackage ap = newAppPackageInstance(new URI(args[1]), true)) {
List<String> newArgs = new ArrayList<>();
List<String> jars = new ArrayList<>();
for (String jar : ap.getAppJars()) {
jars.add(ap.tempDirectory() + "/app/" + jar);
}
for (String libJar : ap.getClassPath()) {
jars.add(ap.tempDirectory() + "/" + libJar);
}
newArgs.add("get-jar-operator-properties");
newArgs.add(StringUtils.join(jars, ","));
newArgs.add(args[2]);
new GetJarOperatorPropertiesCommand().execute(newArgs.toArray(new String[]{}), reader);
}
}
}
private enum AttributesType
{
APPLICATION, OPERATOR, PORT
}
private class ListDefaultAttributesCommand implements Command
{
private final AttributesType type;
protected ListDefaultAttributesCommand(@NotNull AttributesType type)
{
this.type = Preconditions.checkNotNull(type);
}
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
JSONObject result;
if (type == AttributesType.APPLICATION) {
result = TypeDiscoverer.getAppAttributes();
} else if (type == AttributesType.OPERATOR) {
result = TypeDiscoverer.getOperatorAttributes();
} else {
//get port attributes
result = TypeDiscoverer.getPortAttributes();
}
printJson(result);
}
}
private class CleanAppDirectoriesCommand implements Command
{
@Override
public void execute(String[] args, ConsoleReader reader) throws Exception
{
JSONObject result = new JSONObject();
JSONArray appArray = new JSONArray();
List<ApplicationReport> apps = StramClientUtils.cleanAppDirectories(yarnClient, conf, fs,
System.currentTimeMillis() - Long.valueOf(args[1]));
for (ApplicationReport app : apps) {
appArray.put(app.getApplicationId().toString());
}
result.put("applications", appArray);
printJson(result);
}
}
@SuppressWarnings("static-access")
public static class GetPhysicalPropertiesCommandLineOptions
{
final Options options = new Options();
final Option propertyName = add(OptionBuilder.withArgName("property name").hasArg().withDescription("The name of the property whose value needs to be retrieved").create("propertyName"));
final Option waitTime = add(OptionBuilder.withArgName("wait time").hasArg().withDescription("How long to wait to get the result").create("waitTime"));
private Option add(Option opt)
{
this.options.addOption(opt);
return opt;
}
}
private static GetPhysicalPropertiesCommandLineOptions GET_PHYSICAL_PROPERTY_OPTIONS = new GetPhysicalPropertiesCommandLineOptions();
@SuppressWarnings("static-access")
public static class LaunchCommandLineOptions
{
final Options options = new Options();
final Option local = add(new Option("local", "Run application in local mode."));
final Option configFile = add(OptionBuilder.withArgName("configuration file").hasArg().withDescription("Specify an application configuration file.").create("conf"));
final Option apConfigFile = add(OptionBuilder.withArgName("app package configuration file").hasArg().withDescription("Specify an application configuration file within the app package if launching an app package.").create("apconf"));
final Option defProperty = add(OptionBuilder.withArgName("property=value").hasArg().withDescription("Use value for given property.").create("D"));
final Option libjars = add(OptionBuilder.withArgName("comma separated list of libjars").hasArg().withDescription("Specify comma separated jar files or other resource files to include in the classpath.").create("libjars"));
final Option files = add(OptionBuilder.withArgName("comma separated list of files").hasArg().withDescription("Specify comma separated files to be copied on the compute machines.").create("files"));
final Option archives = add(OptionBuilder.withArgName("comma separated list of archives").hasArg().withDescription("Specify comma separated archives to be unarchived on the compute machines.").create("archives"));
final Option ignorePom = add(new Option("ignorepom", "Do not run maven to find the dependency"));
final Option originalAppID = add(OptionBuilder.withArgName("application id").hasArg().withDescription("Specify original application identifier for restart.").create("originalAppId"));
final Option exactMatch = add(new Option("exactMatch", "Only consider applications with exact app name"));
final Option queue = add(OptionBuilder.withArgName("queue name").hasArg().withDescription("Specify the queue to launch the application").create("queue"));
final Option tags = add(OptionBuilder.withArgName("comma separated tags").hasArg().withDescription("Specify the tags for the application").create("tags"));
final Option force = add(new Option("force", "Force launch the application. Do not check for compatibility"));
final Option useConfigApps = add(OptionBuilder.withArgName("inclusive or exclusive").hasArg().withDescription("\"inclusive\" - merge the apps in config and app package. \"exclusive\" - only show config package apps.").create("useConfigApps"));
private Option add(Option opt)
{
this.options.addOption(opt);
return opt;
}
}
private static LaunchCommandLineOptions LAUNCH_OPTIONS = new LaunchCommandLineOptions();
static LaunchCommandLineInfo getLaunchCommandLineInfo(String[] args) throws ParseException
{
CommandLineParser parser = new PosixParser();
LaunchCommandLineInfo result = new LaunchCommandLineInfo();
CommandLine line = parser.parse(LAUNCH_OPTIONS.options, args);
result.localMode = line.hasOption(LAUNCH_OPTIONS.local.getOpt());
result.configFile = line.getOptionValue(LAUNCH_OPTIONS.configFile.getOpt());
result.apConfigFile = line.getOptionValue(LAUNCH_OPTIONS.apConfigFile.getOpt());
result.ignorePom = line.hasOption(LAUNCH_OPTIONS.ignorePom.getOpt());
String[] defs = line.getOptionValues(LAUNCH_OPTIONS.defProperty.getOpt());
if (defs != null) {
result.overrideProperties = new HashMap<>();
for (String def : defs) {
int equal = def.indexOf('=');
if (equal < 0) {
result.overrideProperties.put(def, null);
} else {
result.overrideProperties.put(def.substring(0, equal), def.substring(equal + 1));
}
}
}
result.libjars = line.getOptionValue(LAUNCH_OPTIONS.libjars.getOpt());
result.archives = line.getOptionValue(LAUNCH_OPTIONS.archives.getOpt());
result.files = line.getOptionValue(LAUNCH_OPTIONS.files.getOpt());
result.queue = line.getOptionValue(LAUNCH_OPTIONS.queue.getOpt());
result.tags = line.getOptionValue(LAUNCH_OPTIONS.tags.getOpt());
result.args = line.getArgs();
result.origAppId = line.getOptionValue(LAUNCH_OPTIONS.originalAppID.getOpt());
result.exactMatch = line.hasOption("exactMatch");
result.force = line.hasOption("force");
result.useConfigApps = line.getOptionValue(LAUNCH_OPTIONS.useConfigApps.getOpt());
return result;
}
static class LaunchCommandLineInfo
{
boolean localMode;
boolean ignorePom;
String configFile;
String apConfigFile;
Map<String, String> overrideProperties;
String libjars;
String files;
String queue;
String tags;
String archives;
String origAppId;
boolean exactMatch;
boolean force;
String[] args;
String useConfigApps;
}
@SuppressWarnings("static-access")
public static Options getShowLogicalPlanCommandLineOptions()
{
Options options = new Options();
Option libjars = OptionBuilder.withArgName("comma separated list of jars").hasArg().withDescription("Specify comma separated jar/resource files to include in the classpath.").create("libjars");
Option ignorePom = new Option("ignorepom", "Do not run maven to find the dependency");
Option exactMatch = new Option("exactMatch", "Only consider exact match for app name");
options.addOption(libjars);
options.addOption(ignorePom);
options.addOption(exactMatch);
return options;
}
private static ShowLogicalPlanCommandLineInfo getShowLogicalPlanCommandLineInfo(String[] args) throws ParseException
{
CommandLineParser parser = new PosixParser();
ShowLogicalPlanCommandLineInfo result = new ShowLogicalPlanCommandLineInfo();
CommandLine line = parser.parse(getShowLogicalPlanCommandLineOptions(), args);
result.libjars = line.getOptionValue("libjars");
result.ignorePom = line.hasOption("ignorepom");
result.args = line.getArgs();
result.exactMatch = line.hasOption("exactMatch");
return result;
}
private static class ShowLogicalPlanCommandLineInfo
{
String libjars;
boolean ignorePom;
String[] args;
boolean exactMatch;
}
public void mainHelper() throws Exception
{
init();
run();
System.exit(lastCommandError ? 1 : 0);
}
public static void main(final String[] args) throws Exception
{
LoggerUtil.addAppenders();
final ApexCli shell = new ApexCli();
shell.preImpersonationInit(args);
String hadoopUserName = System.getenv("HADOOP_USER_NAME");
if (UserGroupInformation.isSecurityEnabled()
&& StringUtils.isNotBlank(hadoopUserName)
&& !hadoopUserName.equals(UserGroupInformation.getLoginUser().getUserName())) {
LOG.info("You ({}) are running as user {}", UserGroupInformation.getLoginUser().getUserName(), hadoopUserName);
UserGroupInformation ugi = UserGroupInformation.createProxyUser(hadoopUserName, UserGroupInformation.getLoginUser());
ugi.doAs(new PrivilegedExceptionAction<Void>()
{
@Override
public Void run() throws Exception
{
shell.mainHelper();
return null;
}
});
} else {
shell.mainHelper();
}
}
}