/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package com.datatorrent.stram.cli;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.lang.management.ManagementFactory;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.security.PrivilegedExceptionAction;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;

import javax.validation.constraints.NotNull;
import javax.ws.rs.core.MediaType;

import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.log4j.Appender;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.tools.ant.DirectoryScanner;

import com.google.common.base.Preconditions;
import com.sun.jersey.api.client.WebResource;

import com.datatorrent.api.DAG.GenericOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.stram.StramUtils;
import com.datatorrent.stram.client.AppPackage;
import com.datatorrent.stram.client.AppPackage.AppInfo;
import com.datatorrent.stram.client.AppPackage.PropertyInfo;
import com.datatorrent.stram.client.ConfigPackage;
import com.datatorrent.stram.client.DTConfiguration;
import com.datatorrent.stram.client.DTConfiguration.Scope;
import com.datatorrent.stram.client.RecordingsAgent;
import com.datatorrent.stram.client.RecordingsAgent.RecordingInfo;
import com.datatorrent.stram.client.StramAgent;
import com.datatorrent.stram.client.StramAppLauncher;
import com.datatorrent.stram.client.StramAppLauncher.AppFactory;
import com.datatorrent.stram.client.StramClientUtils;
import com.datatorrent.stram.client.StramClientUtils.ClientRMHelper;
import com.datatorrent.stram.codec.LogicalPlanSerializer;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
import com.datatorrent.stram.plan.logical.requests.AddStreamSinkRequest;
import com.datatorrent.stram.plan.logical.requests.CreateOperatorRequest;
import com.datatorrent.stram.plan.logical.requests.CreateStreamRequest;
import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
import com.datatorrent.stram.plan.logical.requests.RemoveOperatorRequest;
import com.datatorrent.stram.plan.logical.requests.RemoveStreamRequest;
import com.datatorrent.stram.plan.logical.requests.SetOperatorAttributeRequest;
import com.datatorrent.stram.plan.logical.requests.SetOperatorPropertyRequest;
import com.datatorrent.stram.plan.logical.requests.SetPortAttributeRequest;
import com.datatorrent.stram.plan.logical.requests.SetStreamAttributeRequest;
import com.datatorrent.stram.security.StramUserLogin;
import com.datatorrent.stram.util.JSONSerializationProvider;
import com.datatorrent.stram.util.SecurityUtils;
import com.datatorrent.stram.util.VersionInfo;
import com.datatorrent.stram.util.WebServicesClient;
import com.datatorrent.stram.webapp.OperatorDiscoverer;
import com.datatorrent.stram.webapp.StramWebServices;
import com.datatorrent.stram.webapp.TypeDiscoverer;

import jline.console.ConsoleReader;
import jline.console.completer.AggregateCompleter;
import jline.console.completer.ArgumentCompleter;
import jline.console.completer.Completer;
import jline.console.completer.FileNameCompleter;
import jline.console.completer.StringsCompleter;
import jline.console.history.FileHistory;
import jline.console.history.History;
import jline.console.history.MemoryHistory;
import sun.misc.Signal;
import sun.misc.SignalHandler;

/**
 * Provides command line interface for a streaming application on hadoop (yarn)
 * <p>
 *
 * @since 0.3.2
 */
@SuppressWarnings("UseOfSystemOutOrSystemErr")
public class ApexCli
{
  private static final Logger LOG = LoggerFactory.getLogger(ApexCli.class);
  private static String CONFIG_EXCLUSIVE = "exclusive";
  private static String CONFIG_INCLUSIVE = "inclusive";

  private static final String COLOR_RED = "\033[38;5;196m";
  private static final String COLOR_YELLOW = "\033[0;93m";
  private static final String FORMAT_BOLD = "\033[1m";

  private static final String COLOR_RESET = "\033[0m";
  private static final String ITALICS = "\033[3m";
  private static final String APEX_HIGHLIGHT_COLOR_PROPERTY_NAME = "apex.cli.color.highlight";
  private static final String APEX_HIGHLIGHT_COLOR_ENV_VAR_NAME = "APEX_HIGHLIGHT_COLOR";

  protected Configuration conf;
  private FileSystem fs;
  private StramAgent stramAgent;
  private final YarnClient yarnClient = YarnClient.createYarnClient();
  private ApplicationReport currentApp = null;
  private boolean consolePresent;
  private String[] commandsToExecute;
  private final Map<String, CommandSpec> globalCommands = new TreeMap<>();
  private final Map<String, CommandSpec> connectedCommands = new TreeMap<>();
  private final Map<String, CommandSpec> logicalPlanChangeCommands = new TreeMap<>();
  private final Map<String, String> aliases = new HashMap<>();
  private final Map<String, List<String>> macros = new HashMap<>();
  private boolean changingLogicalPlan = false;
  private final List<LogicalPlanRequest> logicalPlanRequestQueue = new ArrayList<>();
  private FileHistory topLevelHistory;
  private FileHistory changingLogicalPlanHistory;
  private String jsonp;
  private boolean raw = false;
  private RecordingsAgent recordingsAgent;
  private final ObjectMapper mapper = new JSONSerializationProvider().getContext(null);
  private String pagerCommand;
  private Process pagerProcess;
  private int verboseLevel = 0;
  private final Tokenizer tokenizer = new Tokenizer();
  private final Map<String, String> variableMap = new HashMap<>();
  private static boolean lastCommandError = false;
  private Thread mainThread;
  private Thread commandThread;
  private String prompt;
  private String forcePrompt;
  private String kerberosPrincipal;
  private String kerberosKeyTab;
  private String highlightColor = null;

  private static class FileLineReader extends ConsoleReader
  {
    private final BufferedReader br;

    FileLineReader(String fileName) throws IOException
    {
      super();
      fileName = expandFileName(fileName, true);
      br = new BufferedReader(new FileReader(fileName));
    }

    @Override
    public String readLine(String prompt) throws IOException
    {
      return br.readLine();
    }

    @Override
    public String readLine(String prompt, Character mask) throws IOException
    {
      return br.readLine();
    }

    @Override
    public String readLine(Character mask) throws IOException
    {
      return br.readLine();
    }

    public void close() throws IOException
    {
      br.close();
    }

  }

  public class Tokenizer
  {
    private void appendToCommandBuffer(List<String> commandBuffer, StringBuffer buf, boolean potentialEmptyArg)
    {
      if (potentialEmptyArg || buf.length() > 0) {
        commandBuffer.add(buf.toString());
        buf.setLength(0);
      }
    }

    private List<String> startNewCommand(LinkedList<List<String>> resultBuffer)
    {
      List<String> newCommand = new ArrayList<>();
      if (!resultBuffer.isEmpty()) {
        List<String> lastCommand = resultBuffer.peekLast();
        if (lastCommand.size() == 1) {
          String first = lastCommand.get(0);
          if (first.matches("^[A-Za-z][A-Za-z0-9]*=.*")) {
            // This is a variable assignment
            int equalSign = first.indexOf('=');
            variableMap.put(first.substring(0, equalSign), first.substring(equalSign + 1));
            resultBuffer.removeLast();
          }
        }
      }
      resultBuffer.add(newCommand);
      return newCommand;
    }

    public List<String[]> tokenize(String commandLine)
    {
      LinkedList<List<String>> resultBuffer = new LinkedList<>();
      List<String> commandBuffer = startNewCommand(resultBuffer);

      if (commandLine != null) {
        commandLine = ltrim(commandLine);
        if (commandLine.startsWith("#")) {
          return null;
        }

        int len = commandLine.length();
        boolean insideQuotes = false;
        boolean potentialEmptyArg = false;
        StringBuffer buf = new StringBuffer(commandLine.length());

        for (@SuppressWarnings("AssignmentToForLoopParameter") int i = 0; i < len; ++i) {
          char c = commandLine.charAt(i);
          if (c == '"') {
            potentialEmptyArg = true;
            insideQuotes = !insideQuotes;
          } else if (c == '\\') {
            if (len > i + 1) {
              switch (commandLine.charAt(i + 1)) {
                case 'n':
                  buf.append("\n");
                  break;
                case 't':
                  buf.append("\t");
                  break;
                case 'r':
                  buf.append("\r");
                  break;
                case 'b':
                  buf.append("\b");
                  break;
                case 'f':
                  buf.append("\f");
                  break;
                default:
                  buf.append(commandLine.charAt(i + 1));
              }
              ++i;
            }
          } else {
            if (insideQuotes) {
              buf.append(c);
            } else {

              if (c == '$') {
                StringBuilder variableName = new StringBuilder(32);
                if (len > i + 1) {
                  if (commandLine.charAt(i + 1) == '{') {
                    ++i;
                    while (len > i + 1) {
                      char ch = commandLine.charAt(i + 1);
                      if (ch != '}') {
                        variableName.append(ch);
                      }
                      ++i;
                      if (ch == '}') {
                        break;
                      }
                      if (len <= i + 1) {
                        throw new CliException("Parse error: unmatched brace");
                      }
                    }
                  } else if (commandLine.charAt(i + 1) == '?') {
                    ++i;
                    buf.append(lastCommandError ? "1" : "0");
                    continue;
                  } else {
                    while (len > i + 1) {
                      char ch = commandLine.charAt(i + 1);
                      if ((variableName.length() > 0 && ch >= '0' && ch <= '9') || ((ch >= 'a' && ch <= 'z') || (ch >= 'A' && ch <= 'Z'))) {
                        variableName.append(ch);
                      } else {
                        break;
                      }
                      ++i;
                    }
                  }
                  if (variableName.length() == 0) {
                    buf.append(c);
                  } else {
                    String value = variableMap.get(variableName.toString());
                    if (value != null) {
                      buf.append(value);
                    }
                  }
                } else {
                  buf.append(c);
                }
              } else if (c == ';') {
                appendToCommandBuffer(commandBuffer, buf, potentialEmptyArg);
                commandBuffer = startNewCommand(resultBuffer);
              } else if (Character.isWhitespace(c)) {
                appendToCommandBuffer(commandBuffer, buf, potentialEmptyArg);
                potentialEmptyArg = false;
                if (len > i + 1 && commandLine.charAt(i + 1) == '#') {
                  break;
                }
              } else {
                buf.append(c);
              }
            }
          }
        }
        appendToCommandBuffer(commandBuffer, buf, potentialEmptyArg);
      }
      startNewCommand(resultBuffer);
      List<String[]> result = new ArrayList<>();
      for (List<String> command : resultBuffer) {
        String[] commandArray = new String[command.size()];
        result.add(command.toArray(commandArray));
      }
      return result;
    }

  }

  private interface Command
  {
    void execute(String[] args, ConsoleReader reader) throws Exception;

  }

  private static class Arg
  {
    final String name;

    Arg(String name)
    {
      this.name = name;
    }

    @Override
    public String toString()
    {
      return name;
    }

  }

  private static class FileArg extends Arg
  {
    FileArg(String name)
    {
      super(name);
    }

  }

  // VarArg must be in optional argument and must be at the end
  private static class VarArg extends Arg
  {
    VarArg(String name)
    {
      super(name);
    }
  }

  private static class CommandArg extends Arg
  {
    CommandArg(String name)
    {
      super(name);
    }

  }

  protected PrintStream suppressOutput()
  {
    PrintStream originalStream = System.out;
    if (raw) {
      PrintStream dummyStream = new PrintStream(new OutputStream()
      {
        @Override
        public void write(int b)
        {
          // no-op
        }

      });
      System.setOut(dummyStream);
    }
    return originalStream;
  }

  protected void restoreOutput(PrintStream originalStream)
  {
    if (raw) {
      System.setOut(originalStream);
    }
  }

  AppPackage newAppPackageInstance(URI uri, boolean suppressOutput) throws IOException
  {
    PrintStream outputStream = suppressOutput ? suppressOutput() : null;
    try {
      final String scheme = uri.getScheme();
      if (scheme == null || scheme.equals("file")) {
        return new AppPackage(new FileInputStream(new File(expandFileName(uri.getPath(), true))), true);
      } else {
        try (FileSystem fs = FileSystem.newInstance(uri, conf)) {
          return new AppPackage(fs.open(new Path(uri.getPath())), true);
        }
      }
    } finally {
      if (outputStream != null) {
        restoreOutput(outputStream);
      }
    }
  }

  AppPackage newAppPackageInstance(File f) throws IOException
  {
    PrintStream outputStream = suppressOutput();
    try {
      return new AppPackage(f, true);
    } finally {
      restoreOutput(outputStream);
    }
  }

  @SuppressWarnings("unused")
  private StramAppLauncher getStramAppLauncher(String jarfileUri, Configuration config, boolean ignorePom) throws Exception
  {
    URI uri = new URI(jarfileUri);
    String scheme = uri.getScheme();
    StramAppLauncher appLauncher = null;
    if (scheme == null || scheme.equals("file")) {
      File jf = new File(uri.getPath());
      appLauncher = new StramAppLauncher(jf, config);
    } else {
      try (FileSystem tmpFs = FileSystem.newInstance(uri, conf)) {
        Path path = new Path(uri.getPath());
        appLauncher = new StramAppLauncher(tmpFs, path, config);
      }
    }
    if (appLauncher != null) {
      if (verboseLevel > 0) {
        System.err.print(appLauncher.getMvnBuildClasspathOutput());
      }
      return appLauncher;
    } else {
      throw new CliException("Scheme " + scheme + " not supported.");
    }
  }

  private static class CommandSpec
  {
    Command command;
    Arg[] requiredArgs;
    Arg[] optionalArgs;
    String description;

    CommandSpec(Command command, Arg[] requiredArgs, Arg[] optionalArgs, String description)
    {
      this.command = command;
      this.requiredArgs = requiredArgs;
      this.optionalArgs = optionalArgs;
      this.description = description;
    }

    void verifyArguments(String[] args) throws CliException
    {
      int minArgs = 0;
      int maxArgs = 0;
      if (requiredArgs != null) {
        minArgs = requiredArgs.length;
        maxArgs = requiredArgs.length;
      }
      if (optionalArgs != null) {
        for (Arg arg : optionalArgs) {
          if (arg instanceof VarArg) {
            maxArgs = Integer.MAX_VALUE;
            break;
          } else {
            maxArgs++;
          }
        }
      }
      if (args.length - 1 < minArgs || args.length - 1 > maxArgs) {
        throw new CliException("Command parameter error");
      }
    }

    void printUsage(String cmd)
    {
      System.err.print("Usage: " + cmd);
      if (requiredArgs != null) {
        for (Arg arg : requiredArgs) {
          System.err.print(" <" + arg + ">");
        }
      }
      if (optionalArgs != null) {
        for (Arg arg : optionalArgs) {
          if (arg instanceof VarArg) {
            System.err.print(" [<" + arg + "> ... ]");
          } else {
            System.err.print(" [<" + arg + ">]");
          }
        }
      }
      System.err.println();
    }

  }

  private static class OptionsCommandSpec extends CommandSpec
  {
    Options options;

    OptionsCommandSpec(Command command, Arg[] requiredArgs, Arg[] optionalArgs, String description, Options options)
    {
      super(command, requiredArgs, optionalArgs, description);
      this.options = options;
    }

    @Override
    void verifyArguments(String[] args) throws CliException
    {
      try {
        args = new PosixParser().parse(options, args).getArgs();
        super.verifyArguments(args);
      } catch (Exception ex) {
        throw new CliException("Command parameter error");
      }
    }

    @Override
    void printUsage(String cmd)
    {
      super.printUsage(cmd + ((options == null) ? "" : " [options]"));
      if (options != null) {
        System.out.println("Options:");
        HelpFormatter formatter = new HelpFormatter();
        PrintWriter pw = new PrintWriter(System.out);
        formatter.printOptions(pw, 80, options, 4, 4);
        pw.flush();
      }
    }

  }

  ApexCli()
  {
    //
    // Global command specification starts here
    //
    globalCommands.put("help", new CommandSpec(new HelpCommand(),
        null,
        new Arg[]{new CommandArg("command")},
        "Show help"));
    globalCommands.put("echo", new CommandSpec(new EchoCommand(),
        null, new Arg[]{new VarArg("arg")},
        "Echo the arguments"));
    globalCommands.put("connect", new CommandSpec(new ConnectCommand(),
        new Arg[]{new Arg("app-id")},
        null,
        "Connect to an app"));
    globalCommands.put("launch", new OptionsCommandSpec(new LaunchCommand(),
        new Arg[]{},
        new Arg[]{new FileArg("jar-file/json-file/properties-file/app-package-file-path/app-package-file-uri"), new Arg("matching-app-name")},
        "Launch an app", LAUNCH_OPTIONS.options));
    globalCommands.put("shutdown-app", new CommandSpec(new ShutdownAppCommand(),
        new Arg[]{new Arg("app-id")},
        new Arg[]{new VarArg("app-id")},
        "Shutdown an app"));
    globalCommands.put("list-apps", new CommandSpec(new ListAppsCommand(),
        null,
        new Arg[]{new Arg("pattern")},
        "List applications"));
    globalCommands.put("kill-app", new CommandSpec(new KillAppCommand(),
        new Arg[]{new Arg("app-id/app-name")},
        new Arg[]{new VarArg("app-id/app-name")},
        "Kill an app"));
    globalCommands.put("show-logical-plan", new OptionsCommandSpec(new ShowLogicalPlanCommand(),
        new Arg[]{new FileArg("jar-file/app-package-file")},
        new Arg[]{new Arg("class-name")},
        "List apps in a jar or show logical plan of an app class",
        getShowLogicalPlanCommandLineOptions()));

    globalCommands.put("get-jar-operator-classes", new OptionsCommandSpec(new GetJarOperatorClassesCommand(),
        new Arg[]{new FileArg("jar-files-comma-separated")},
        new Arg[]{new Arg("search-term")},
        "List operators in a jar list",
        GET_OPERATOR_CLASSES_OPTIONS.options));

    globalCommands.put("get-jar-operator-properties", new CommandSpec(new GetJarOperatorPropertiesCommand(),
        new Arg[]{new FileArg("jar-files-comma-separated"), new Arg("operator-class-name")},
        null,
        "List properties in specified operator"));

    globalCommands.put("alias", new CommandSpec(new AliasCommand(),
        new Arg[]{new Arg("alias-name"), new CommandArg("command")},
        null,
        "Create a command alias"));
    globalCommands.put("source", new CommandSpec(new SourceCommand(),
        new Arg[]{new FileArg("file")},
        null,
        "Execute the commands in a file"));
    globalCommands.put("exit", new CommandSpec(new ExitCommand(),
        null,
        null,
        "Exit the CLI"));
    globalCommands.put("begin-macro", new CommandSpec(new BeginMacroCommand(),
        new Arg[]{new Arg("name")},
        null,
        "Begin Macro Definition ($1...$9 to access parameters and type 'end' to end the definition)"));
    globalCommands.put("dump-properties-file", new CommandSpec(new DumpPropertiesFileCommand(),
        new Arg[]{new FileArg("out-file"), new FileArg("jar-file"), new Arg("app-name")},
        null,
        "Dump the properties file of an app class"));
    globalCommands.put("get-app-info", new CommandSpec(new GetAppInfoCommand(),
        new Arg[]{new Arg("app-id")},
        null,
        "Get the information of an app"));
    globalCommands.put("set-pager", new CommandSpec(new SetPagerCommand(),
        new Arg[]{new Arg("on/off")},
        null,
        "Set the pager program for output"));
    globalCommands.put("get-config-parameter", new CommandSpec(new GetConfigParameterCommand(),
        null,
        new Arg[]{new FileArg("parameter-name")},
        "Get the configuration parameter"));
    globalCommands.put("get-app-package-info", new OptionsCommandSpec(new GetAppPackageInfoCommand(),
        new Arg[]{new FileArg("app-package-file-path/app-package-file-uri")},
        new Arg[]{new Arg("-withDescription")},
        "Get info on the app package file",
        GET_APP_PACKAGE_INFO_OPTIONS));
    globalCommands.put("get-app-package-operators", new OptionsCommandSpec(new GetAppPackageOperatorsCommand(),
        new Arg[]{new FileArg("app-package-file-path/app-package-file-uri")},
        new Arg[]{new Arg("search-term")},
        "Get operators within the given app package",
        GET_OPERATOR_CLASSES_OPTIONS.options));
    globalCommands.put("get-app-package-operator-properties", new CommandSpec(new GetAppPackageOperatorPropertiesCommand(),
        new Arg[]{new FileArg("app-package-file-path/app-package-file-uri"), new Arg("operator-class")},
        null,
        "Get operator properties within the given app package"));
    globalCommands.put("list-default-app-attributes", new CommandSpec(new ListDefaultAttributesCommand(AttributesType.APPLICATION),
        null, null, "Lists the default application attributes"));
    globalCommands.put("list-default-operator-attributes", new CommandSpec(new ListDefaultAttributesCommand(AttributesType.OPERATOR),
        null, null, "Lists the default operator attributes"));
    globalCommands.put("list-default-port-attributes", new CommandSpec(new ListDefaultAttributesCommand(AttributesType.PORT),
        null, null, "Lists the default port attributes"));
    globalCommands.put("clean-app-directories", new CommandSpec(new CleanAppDirectoriesCommand(),
        new Arg[]{new Arg("duration-in-millis")},
        null,
        "Clean up data directories of applications that terminated the given milliseconds ago"));

    //
    // Connected command specification starts here
    //
    connectedCommands.put("list-containers", new CommandSpec(new ListContainersCommand(),
        null,
        null,
        "List containers"));
    connectedCommands.put("list-operators", new CommandSpec(new ListOperatorsCommand(),
        null,
        new Arg[]{new Arg("pattern")},
        "List operators"));
    connectedCommands.put("show-physical-plan", new CommandSpec(new ShowPhysicalPlanCommand(),
        null,
        null,
        "Show physical plan"));
    connectedCommands.put("kill-container", new CommandSpec(new KillContainerCommand(),
        new Arg[]{new Arg("container-id")},
        new Arg[]{new VarArg("container-id")},
        "Kill a container"));
    connectedCommands.put("shutdown-app", new CommandSpec(new ShutdownAppCommand(),
        null,
        new Arg[]{new VarArg("app-id")},
        "Shutdown an app"));
    connectedCommands.put("kill-app", new CommandSpec(new KillAppCommand(),
        null,
        new Arg[]{new VarArg("app-id/app-name")},
        "Kill an app"));
    connectedCommands.put("wait", new CommandSpec(new WaitCommand(),
        new Arg[]{new Arg("timeout")},
        null,
        "Wait for completion of current application"));
    connectedCommands.put("start-recording", new CommandSpec(new StartRecordingCommand(),
        new Arg[]{new Arg("operator-id")},
        new Arg[]{new Arg("port-name"), new Arg("num-windows")},
        "Start recording"));
    connectedCommands.put("stop-recording", new CommandSpec(new StopRecordingCommand(),
        new Arg[]{new Arg("operator-id")},
        new Arg[]{new Arg("port-name")},
        "Stop recording"));
    connectedCommands.put("get-operator-attributes", new CommandSpec(new GetOperatorAttributesCommand(),
        new Arg[]{new Arg("operator-name")},
        new Arg[]{new Arg("attribute-name")},
        "Get attributes of an operator"));
    connectedCommands.put("get-operator-properties", new CommandSpec(new GetOperatorPropertiesCommand(),
        new Arg[]{new Arg("operator-name")},
        new Arg[]{new Arg("property-name")},
        "Get properties of a logical operator"));
    connectedCommands.put("get-physical-operator-properties", new OptionsCommandSpec(new GetPhysicalOperatorPropertiesCommand(),
        new Arg[]{new Arg("operator-id")},
        null,
        "Get properties of a physical operator", GET_PHYSICAL_PROPERTY_OPTIONS.options));

    connectedCommands.put("set-operator-property", new CommandSpec(new SetOperatorPropertyCommand(),
        new Arg[]{new Arg("operator-name"), new Arg("property-name"), new Arg("property-value")},
        null,
        "Set a property of an operator"));
    connectedCommands.put("set-physical-operator-property", new CommandSpec(new SetPhysicalOperatorPropertyCommand(),
        new Arg[]{new Arg("operator-id"), new Arg("property-name"), new Arg("property-value")},
        null,
        "Set a property of an operator"));
    connectedCommands.put("get-app-attributes", new CommandSpec(new GetAppAttributesCommand(),
        null,
        new Arg[]{new Arg("attribute-name")},
        "Get attributes of the connected app"));
    connectedCommands.put("get-port-attributes", new CommandSpec(new GetPortAttributesCommand(),
        new Arg[]{new Arg("operator-name"), new Arg("port-name")},
        new Arg[]{new Arg("attribute-name")},
        "Get attributes of a port"));
    connectedCommands.put("begin-logical-plan-change", new CommandSpec(new BeginLogicalPlanChangeCommand(),
        null,
        null,
        "Begin Logical Plan Change"));
    connectedCommands.put("show-logical-plan", new OptionsCommandSpec(new ShowLogicalPlanCommand(),
        null,
        new Arg[]{new FileArg("jar-file/app-package-file-path/app-package-file-uri"), new Arg("class-name")},
        "Show logical plan of an app class",
        getShowLogicalPlanCommandLineOptions()));
    connectedCommands.put("dump-properties-file", new CommandSpec(new DumpPropertiesFileCommand(),
        new Arg[]{new FileArg("out-file")},
        new Arg[]{new FileArg("jar-file"), new Arg("class-name")},
        "Dump the properties file of an app class"));
    connectedCommands.put("get-app-info", new CommandSpec(new GetAppInfoCommand(),
        null,
        new Arg[]{new Arg("app-id")},
        "Get the information of an app"));
    connectedCommands.put("get-recording-info", new CommandSpec(new GetRecordingInfoCommand(),
        null,
        new Arg[]{new Arg("operator-id"), new Arg("start-time")},
        "Get tuple recording info"));
    connectedCommands.put("get-container-stacktrace", new CommandSpec(new GetContainerStackTrace(),
        null,
        new Arg[]{new Arg("container-id")},
        "Get the stack trace for the container"));

    //
    // Logical plan change command specification starts here
    //
    logicalPlanChangeCommands.put("help", new CommandSpec(new HelpCommand(),
        null,
        new Arg[]{new Arg("command")},
        "Show help"));
    logicalPlanChangeCommands.put("create-operator", new CommandSpec(new CreateOperatorCommand(),
        new Arg[]{new Arg("operator-name"), new Arg("class-name")},
        null,
        "Create an operator"));
    logicalPlanChangeCommands.put("create-stream", new CommandSpec(new CreateStreamCommand(),
        new Arg[]{new Arg("stream-name"), new Arg("from-operator-name"), new Arg("from-port-name"), new Arg("to-operator-name"), new Arg("to-port-name")},
        null,
        "Create a stream"));
    logicalPlanChangeCommands.put("add-stream-sink", new CommandSpec(new AddStreamSinkCommand(),
        new Arg[]{new Arg("stream-name"), new Arg("to-operator-name"), new Arg("to-port-name")},
        null,
        "Add a sink to an existing stream"));
    logicalPlanChangeCommands.put("remove-operator", new CommandSpec(new RemoveOperatorCommand(),
        new Arg[]{new Arg("operator-name")},
        null,
        "Remove an operator"));
    logicalPlanChangeCommands.put("remove-stream", new CommandSpec(new RemoveStreamCommand(),
        new Arg[]{new Arg("stream-name")},
        null,
        "Remove a stream"));
    logicalPlanChangeCommands.put("set-operator-property", new CommandSpec(new SetOperatorPropertyCommand(),
        new Arg[]{new Arg("operator-name"), new Arg("property-name"), new Arg("property-value")},
        null,
        "Set a property of an operator"));
    logicalPlanChangeCommands.put("set-operator-attribute", new CommandSpec(new SetOperatorAttributeCommand(),
        new Arg[]{new Arg("operator-name"), new Arg("attr-name"), new Arg("attr-value")},
        null,
        "Set an attribute of an operator"));
    logicalPlanChangeCommands.put("set-port-attribute", new CommandSpec(new SetPortAttributeCommand(),
        new Arg[]{new Arg("operator-name"), new Arg("port-name"), new Arg("attr-name"), new Arg("attr-value")},
        null,
        "Set an attribute of a port"));
    logicalPlanChangeCommands.put("set-stream-attribute", new CommandSpec(new SetStreamAttributeCommand(),
        new Arg[]{new Arg("stream-name"), new Arg("attr-name"), new Arg("attr-value")},
        null,
        "Set an attribute of a stream"));
    logicalPlanChangeCommands.put("show-queue", new CommandSpec(new ShowQueueCommand(),
        null,
        null,
        "Show the queue of the plan change"));
    logicalPlanChangeCommands.put("submit", new CommandSpec(new SubmitCommand(),
        null,
        null,
        "Submit the plan change"));
    logicalPlanChangeCommands.put("abort", new CommandSpec(new AbortCommand(),
        null,
        null,
        "Abort the plan change"));
  }

  private void printJson(String json) throws IOException
  {
    PrintStream os = getOutputPrintStream();

    if (jsonp != null) {
      os.println(jsonp + "(" + json + ");");
    } else {
      os.println(json);
    }
    os.flush();
    closeOutputPrintStream(os);
  }

  private void printJson(JSONObject json) throws JSONException, IOException
  {
    printJson(raw ? json.toString() : json.toString(2));
  }

  private void printJson(JSONArray jsonArray, String name) throws JSONException, IOException
  {
    JSONObject json = new JSONObject();
    json.put(name, jsonArray);
    printJson(json);
  }

  private <K, V> void printJson(Map<K, V> map) throws IOException, JSONException
  {
    printJson(new JSONObject(mapper.writeValueAsString(map)));
  }

  private <T> void printJson(List<T> list, String name) throws IOException, JSONException
  {
    printJson(new JSONArray(mapper.writeValueAsString(list)), name);
  }

  private PrintStream getOutputPrintStream() throws IOException
  {
    if (pagerCommand == null) {
      pagerProcess = null;
      return System.out;
    } else {
      pagerProcess = Runtime.getRuntime().exec(new String[]{"sh", "-c",
        pagerCommand + " >/dev/tty"});
      return new PrintStream(pagerProcess.getOutputStream());
    }
  }

  private void closeOutputPrintStream(PrintStream os)
  {
    if (os != System.out) {
      os.close();
      try {
        pagerProcess.waitFor();
      } catch (InterruptedException ex) {
        LOG.debug("Interrupted");
      }
    }
  }

  private static String expandFileName(String fileName, boolean expandWildCard) throws IOException
  {
    if (fileName.matches("^[a-zA-Z]+:.*")) {
      // it's a URL
      return fileName;
    }

    // TODO: need to work with other users' home directory
    if (fileName.startsWith("~" + File.separator)) {
      fileName = System.getProperty("user.home") + fileName.substring(1);
    }
    fileName = new File(fileName).getCanonicalPath();
    //LOG.debug("Canonical path: {}", fileName);
    if (expandWildCard) {
      DirectoryScanner scanner = new DirectoryScanner();
      scanner.setIncludes(new String[]{fileName});
      scanner.scan();
      String[] files = scanner.getIncludedFiles();

      if (files.length == 0) {
        throw new CliException(fileName + " does not match any file");
      } else if (files.length > 1) {
        throw new CliException(fileName + " matches more than one file");
      }
      return files[0];
    } else {
      return fileName;
    }
  }

  private static String[] expandFileNames(String fileName) throws IOException
  {
    // TODO: need to work with other users
    if (fileName.matches("^[a-zA-Z]+:.*")) {
      // it's a URL
      return new String[]{fileName};
    }
    if (fileName.startsWith("~" + File.separator)) {
      fileName = System.getProperty("user.home") + fileName.substring(1);
    }
    fileName = new File(fileName).getCanonicalPath();
    LOG.debug("Canonical path: {}", fileName);
    DirectoryScanner scanner = new DirectoryScanner();
    scanner.setIncludes(new String[]{fileName});
    scanner.scan();
    return scanner.getIncludedFiles();
  }

  private static String expandCommaSeparatedFiles(String filenames) throws IOException
  {
    String[] entries = filenames.split(",");
    StringBuilder result = new StringBuilder(filenames.length());
    for (String entry : entries) {
      for (String file : expandFileNames(entry)) {
        if (result.length() > 0) {
          result.append(",");
        }
        result.append(file);
      }
    }
    if (result.length() == 0) {
      return null;
    }
    return result.toString();
  }

  protected ApplicationReport getApplicationByName(String appName)
  {
    if (appName == null) {
      throw new CliException("Invalid application name provided by user");
    }
    List<ApplicationReport> appList = getApplicationList();
    for (ApplicationReport ar : appList) {
      if ((ar.getName().equals(appName)) &&
          (ar.getYarnApplicationState() != YarnApplicationState.KILLED) &&
          (ar.getYarnApplicationState() != YarnApplicationState.FINISHED)) {
        LOG.debug("Application Name: {} Application ID: {} Application State: {}",
            ar.getName(), ar.getApplicationId().toString(), YarnApplicationState.FINISHED);
        return ar;
      }
    }
    return null;
  }

  protected ApplicationReport getApplication(String appId)
  {
    List<ApplicationReport> appList = getApplicationList();
    if (StringUtils.isNumeric(appId)) {
      int appSeq = Integer.parseInt(appId);
      for (ApplicationReport ar : appList) {
        if (ar.getApplicationId().getId() == appSeq) {
          return ar;
        }
      }
    } else {
      for (ApplicationReport ar : appList) {
        if (ar.getApplicationId().toString().equals(appId)) {
          return ar;
        }
      }
    }
    return null;
  }

  static class CliException extends RuntimeException
  {
    private static final long serialVersionUID = 1L;

    CliException(String msg, Throwable cause)
    {
      super(msg, cause);
    }

    CliException(String msg)
    {
      super(msg);
    }

  }

  public void preImpersonationInit(String[] args) throws IOException
  {
    Signal.handle(new Signal("INT"), new SignalHandler()
    {
      @Override
      public void handle(Signal sig)
      {
        System.out.println("^C");
        if (commandThread != null) {
          commandThread.interrupt();
          mainThread.interrupt();
        } else {
          System.out.print(prompt);
          System.out.flush();
        }
      }
    });
    consolePresent = (System.console() != null);
    Options options = new Options();
    options.addOption("e", true, "Commands are read from the argument");
    options.addOption("v", false, "Verbose mode level 1");
    options.addOption("vv", false, "Verbose mode level 2");
    options.addOption("vvv", false, "Verbose mode level 3");
    options.addOption("vvvv", false, "Verbose mode level 4");
    options.addOption("r", false, "JSON Raw mode");
    options.addOption("p", true, "JSONP padding function");
    options.addOption("h", false, "Print this help");
    options.addOption("f", true, "Use the specified prompt at all time");
    options.addOption("kp", true, "Use the specified kerberos principal");
    options.addOption("kt", true, "Use the specified kerberos keytab");

    CommandLineParser parser = new BasicParser();
    try {
      CommandLine cmd = parser.parse(options, args);
      if (cmd.hasOption("v")) {
        verboseLevel = 1;
      }
      if (cmd.hasOption("vv")) {
        verboseLevel = 2;
      }
      if (cmd.hasOption("vvv")) {
        verboseLevel = 3;
      }
      if (cmd.hasOption("vvvv")) {
        verboseLevel = 4;
      }
      if (cmd.hasOption("r")) {
        raw = true;
      }
      if (cmd.hasOption("e")) {
        commandsToExecute = cmd.getOptionValues("e");
        consolePresent = false;
      }
      if (cmd.hasOption("p")) {
        jsonp = cmd.getOptionValue("p");
      }
      if (cmd.hasOption("f")) {
        forcePrompt = cmd.getOptionValue("f");
      }
      if (cmd.hasOption("h")) {
        HelpFormatter formatter = new HelpFormatter();
        formatter.printHelp(ApexCli.class.getSimpleName(), options);
        System.exit(0);
      }
      if (cmd.hasOption("kp")) {
        kerberosPrincipal = cmd.getOptionValue("kp");
      }
      if (cmd.hasOption("kt")) {
        kerberosKeyTab = cmd.getOptionValue("kt");
      }
    } catch (ParseException ex) {
      System.err.println("Invalid argument: " + ex);
      System.exit(1);
    }

    if (kerberosPrincipal == null && kerberosKeyTab != null) {
      System.err.println("Kerberos key tab is specified but not the kerberos principal. Please specify it using the -kp option.");
      System.exit(1);
    }
    if (kerberosPrincipal != null && kerberosKeyTab == null) {
      System.err.println("Kerberos principal is specified but not the kerberos key tab. Please specify it using the -kt option.");
      System.exit(1);
    }

    Level logLevel;
    switch (verboseLevel) {
      case 0:
        logLevel = Level.OFF;
        break;
      case 1:
        logLevel = Level.ERROR;
        break;
      case 2:
        logLevel = Level.WARN;
        break;
      case 3:
        logLevel = Level.INFO;
        break;
      default:
        logLevel = Level.DEBUG;
        break;
    }

    for (org.apache.log4j.Logger logger : new org.apache.log4j.Logger[]{
        org.apache.log4j.Logger.getRootLogger(),
        org.apache.log4j.Logger.getLogger(ApexCli.class)
    }) {

     /*
      * Override logLevel specified by user, the same logLevel would be inherited by all
      * appenders related to logger.
      */
      logger.setLevel(logLevel);

      @SuppressWarnings("unchecked")
      Enumeration<Appender> allAppenders = logger.getAllAppenders();
      while (allAppenders.hasMoreElements()) {
        Appender appender = allAppenders.nextElement();
        if (appender instanceof ConsoleAppender) {
          ((ConsoleAppender)appender).setThreshold(logLevel);
        }
      }
    }

    if (commandsToExecute != null) {
      for (String command : commandsToExecute) {
        LOG.debug("Command to be executed: {}", command);
      }
    }
    if (kerberosPrincipal != null && kerberosKeyTab != null) {
      StramUserLogin.authenticate(kerberosPrincipal, kerberosKeyTab);
    } else {
      Configuration config = new YarnConfiguration();
      StramClientUtils.addDTLocalResources(config);
      StramUserLogin.attemptAuthentication(config);
    }
  }

  /**
   * get highlight color based on env variable first and then config
   *
   */
  protected String getHighlightColor()
  {
    if (highlightColor == null) {
      highlightColor = System.getenv(APEX_HIGHLIGHT_COLOR_ENV_VAR_NAME);
      if (StringUtils.isBlank(highlightColor)) {
        highlightColor = conf.get(APEX_HIGHLIGHT_COLOR_PROPERTY_NAME, FORMAT_BOLD);
      }
      highlightColor = highlightColor.replace("\\e", "\033");
    }
    return highlightColor;
  }

  public void init() throws IOException
  {
    conf = StramClientUtils.addDTSiteResources(new YarnConfiguration());
    SecurityUtils.init(conf);
    fs = StramClientUtils.newFileSystemInstance(conf);
    stramAgent = new StramAgent(fs, conf);

    yarnClient.init(conf);
    yarnClient.start();
    LOG.debug("Yarn Client initialized and started");
    String socks = conf.get(CommonConfigurationKeysPublic.HADOOP_SOCKS_SERVER_KEY);
    if (socks != null) {
      int colon = socks.indexOf(':');
      if (colon > 0) {
        LOG.info("Using socks proxy at {}", socks);
        System.setProperty("socksProxyHost", socks.substring(0, colon));
        System.setProperty("socksProxyPort", socks.substring(colon + 1));
      }
    }
  }

  private void processSourceFile(String fileName, ConsoleReader reader) throws FileNotFoundException, IOException
  {
    fileName = expandFileName(fileName, true);
    LOG.debug("Sourcing {}", fileName);
    boolean consolePresentSaved = consolePresent;
    consolePresent = false;
    FileLineReader fr = null;
    String line;
    try {
      fr = new FileLineReader(fileName);
      while ((line = fr.readLine("")) != null) {
        processLine(line, fr, true);
      }
    } finally {
      consolePresent = consolePresentSaved;
      if (fr != null) {
        fr.close();
      }
    }
  }

  private static final class MyNullCompleter implements Completer
  {
    public static final MyNullCompleter INSTANCE = new MyNullCompleter();

    @Override
    public int complete(final String buffer, final int cursor, final List<CharSequence> candidates)
    {
      candidates.add("");
      return cursor;
    }

  }

  private static final class MyFileNameCompleter extends FileNameCompleter
  {
    @Override
    public int complete(final String buffer, final int cursor, final List<CharSequence> candidates)
    {
      int result = super.complete(buffer, cursor, candidates);
      if (candidates.isEmpty()) {
        candidates.add("");
        result = cursor;
      }
      return result;
    }

  }

  private List<Completer> defaultCompleters()
  {
    Map<String, CommandSpec> commands = new TreeMap<>();

    commands.putAll(logicalPlanChangeCommands);
    commands.putAll(connectedCommands);
    commands.putAll(globalCommands);

    List<Completer> completers = new LinkedList<>();
    for (Map.Entry<String, CommandSpec> entry : commands.entrySet()) {
      String command = entry.getKey();
      CommandSpec cs = entry.getValue();
      List<Completer> argCompleters = new LinkedList<>();
      argCompleters.add(new StringsCompleter(command));
      Arg[] args = (Arg[])ArrayUtils.addAll(cs.requiredArgs, cs.optionalArgs);
      if (args != null) {
        if (cs instanceof OptionsCommandSpec) {
          // ugly hack because jline cannot dynamically change completer while user types
          if (args[0] instanceof FileArg || args[0] instanceof VarArg) {
            for (int i = 0; i < 10; i++) {
              argCompleters.add(new MyFileNameCompleter());
            }
          }
        } else {
          for (Arg arg : args) {
            if (arg instanceof FileArg || arg instanceof VarArg) {
              argCompleters.add(new MyFileNameCompleter());
            } else if (arg instanceof CommandArg) {
              argCompleters.add(new StringsCompleter(commands.keySet().toArray(new String[]{})));
            } else {
              argCompleters.add(MyNullCompleter.INSTANCE);
            }
          }
        }
      }

      completers.add(new ArgumentCompleter(argCompleters));
    }

    List<Completer> argCompleters = new LinkedList<>();
    Set<String> set = new TreeSet<>();
    set.addAll(aliases.keySet());
    set.addAll(macros.keySet());
    argCompleters.add(new StringsCompleter(set.toArray(new String[]{})));
    for (int i = 0; i < 10; i++) {
      argCompleters.add(new MyFileNameCompleter());
    }
    completers.add(new ArgumentCompleter(argCompleters));
    return completers;
  }

  private void setupCompleter(ConsoleReader reader)
  {
    reader.addCompleter(new AggregateCompleter(defaultCompleters()));
  }

  private void updateCompleter(ConsoleReader reader)
  {
    List<Completer> completers = new ArrayList<>(reader.getCompleters());
    for (Completer c : completers) {
      reader.removeCompleter(c);
    }
    setupCompleter(reader);
  }

  private void setupHistory(ConsoleReader reader)
  {
    File historyFile = new File(StramClientUtils.getUserDTDirectory(), "cli_history");
    historyFile.getParentFile().mkdirs();
    try {
      topLevelHistory = new FileHistory(historyFile);
      reader.setHistory(topLevelHistory);
      historyFile = new File(StramClientUtils.getUserDTDirectory(), "cli_history_clp");
      changingLogicalPlanHistory = new FileHistory(historyFile);
    } catch (IOException ex) {
      System.err.printf("Unable to open %s for writing.", historyFile);
    }
  }

  private void setupAgents() throws IOException
  {
    recordingsAgent = new RecordingsAgent(stramAgent);
  }

  public void run() throws IOException
  {
    ConsoleReader reader = new ConsoleReader();
    reader.setExpandEvents(false);
    reader.setBellEnabled(false);
    try {
      processSourceFile(StramClientUtils.getConfigDir() + "/clirc_system", reader);
    } catch (Exception ex) {
      // ignore
    }
    try {
      processSourceFile(StramClientUtils.getUserDTDirectory() + "/clirc", reader);
    } catch (Exception ex) {
      // ignore
    }
    if (consolePresent) {
      printWelcomeMessage();
      setupCompleter(reader);
      setupHistory(reader);
      //reader.setHandleUserInterrupt(true);
    } else {
      reader.setEchoCharacter((char)0);
    }
    setupAgents();
    String line;
    PrintWriter out = new PrintWriter(System.out);
    int i = 0;
    while (true) {
      if (commandsToExecute != null) {
        if (i >= commandsToExecute.length) {
          break;
        }
        line = commandsToExecute[i++];
      } else {
        line = readLine(reader);
        if (line == null) {
          break;
        }
      }
      processLine(line, reader, true);
      out.flush();
    }
    if (topLevelHistory != null) {
      try {
        topLevelHistory.flush();
      } catch (IOException ex) {
        LOG.warn("Cannot flush command history", ex);
      }
    }
    if (changingLogicalPlanHistory != null) {
      try {
        changingLogicalPlanHistory.flush();
      } catch (IOException ex) {
        LOG.warn("Cannot flush command history", ex);
      }
    }
    if (consolePresent) {
      System.out.println("exit");
    }
  }

  private List<String> expandMacro(List<String> lines, String[] args)
  {
    List<String> expandedLines = new ArrayList<>();

    for (String line : lines) {
      int previousIndex = 0;
      StringBuilder expandedLine = new StringBuilder(line.length());
      while (true) {
        // Search for $0..$9 within the each line and replace by corresponding args
        int currentIndex = line.indexOf('$', previousIndex);
        if (currentIndex > 0 && line.length() > currentIndex + 1) {
          int argIndex = line.charAt(currentIndex + 1) - '0';
          if (args.length > argIndex && argIndex >= 0) {
            // Replace $0 with macro name or $1..$9 with input arguments
            expandedLine.append(line.substring(previousIndex, currentIndex)).append(args[argIndex]);
          } else if (argIndex >= 0 && argIndex <= 9) {
            // Arguments for $1..$9 were not supplied - replace with empty strings
            expandedLine.append(line.substring(previousIndex, currentIndex));
          } else {
            // Outside valid arguments range - ignore and do not replace
            expandedLine.append(line.substring(previousIndex, currentIndex + 2));
          }
          currentIndex += 2;
        } else {
          expandedLine.append(line.substring(previousIndex));
          expandedLines.add(expandedLine.toString());
          break;
        }
        previousIndex = currentIndex;
      }
    }
    return expandedLines;
  }

  private static String ltrim(String s)
  {
    int i = 0;
    while (i < s.length() && Character.isWhitespace(s.charAt(i))) {
      i++;
    }
    return s.substring(i);
  }

  private void processLine(String line, final ConsoleReader reader, boolean expandMacroAlias)
  {
    try {
      // clear interrupt flag
      Thread.interrupted();
      if (reader.isHistoryEnabled()) {
        History history = reader.getHistory();
        if (history instanceof FileHistory) {
          try {
            ((FileHistory)history).flush();
          } catch (IOException ex) {
            // ignore
          }
        }
      }
      //LOG.debug("line: \"{}\"", line);
      List<String[]> commands = tokenizer.tokenize(line);
      if (commands == null) {
        return;
      }
      for (final String[] args : commands) {
        if (args.length == 0 || StringUtils.isBlank(args[0])) {
          continue;
        }
        //LOG.debug("Got: {}", mapper.writeValueAsString(args));
        if (expandMacroAlias) {
          if (macros.containsKey(args[0])) {
            List<String> macroItems = expandMacro(macros.get(args[0]), args);
            for (String macroItem : macroItems) {
              if (consolePresent) {
                System.out.println("expanded-macro> " + macroItem);
              }
              processLine(macroItem, reader, false);
            }
            continue;
          }

          if (aliases.containsKey(args[0])) {
            processLine(aliases.get(args[0]), reader, false);
            continue;
          }
        }
        CommandSpec cs = null;
        if (changingLogicalPlan) {
          cs = logicalPlanChangeCommands.get(args[0]);
        } else {
          if (currentApp != null) {
            cs = connectedCommands.get(args[0]);
          }
          if (cs == null) {
            cs = globalCommands.get(args[0]);
          }
        }
        if (cs == null) {
          if (connectedCommands.get(args[0]) != null) {
            System.err.println("\"" + args[0] + "\" is valid only when connected to an application. Type \"connect <appid>\" to connect to an application.");
            lastCommandError = true;
          } else if (logicalPlanChangeCommands.get(args[0]) != null) {
            System.err.println("\"" + args[0] + "\" is valid only when changing a logical plan.  Type \"begin-logical-plan-change\" to change a logical plan");
            lastCommandError = true;
          } else {
            System.err.println("Invalid command '" + args[0] + "'. Type \"help\" for list of commands");
            lastCommandError = true;
          }
        } else {
          try {
            cs.verifyArguments(args);
          } catch (CliException ex) {
            cs.printUsage(args[0]);
            throw ex;
          }
          final Command command = cs.command;
          commandThread = new Thread()
          {
            @Override
            public void run()
            {
              try {
                command.execute(args, reader);
                lastCommandError = false;
              } catch (Exception e) {
                handleException(e);
              } catch (Error e) {
                handleException(e);
                System.err.println("Fatal error encountered");
                System.exit(1);
              }
            }

          };
          mainThread = Thread.currentThread();
          commandThread.start();
          try {
            commandThread.join();
          } catch (InterruptedException ex) {
            System.err.println("Interrupted");
          }
          commandThread = null;
        }
      }
    } catch (Exception e) {
      handleException(e);
    }
  }

  private void handleException(Throwable e)
  {
    System.err.println(ExceptionUtils.getFullStackTrace(e));
    LOG.error("Exception caught: ", e);
    lastCommandError = true;
  }

  private void printWelcomeMessage()
  {
    VersionInfo v = VersionInfo.APEX_VERSION;
    System.out.println("Apex CLI " + v.getVersion() + " " + v.getDate() + " " + v.getRevision());
  }

  private void printHelp(String command, CommandSpec commandSpec, PrintStream os)
  {
    if (consolePresent) {
      os.print(getHighlightColor());
      os.print(command);
      os.print(COLOR_RESET);
    } else {
      os.print(command);
    }
    if (commandSpec instanceof OptionsCommandSpec) {
      OptionsCommandSpec ocs = (OptionsCommandSpec)commandSpec;
      if (ocs.options != null) {
        os.print(" [options]");
      }
    }
    if (commandSpec.requiredArgs != null) {
      for (Arg arg : commandSpec.requiredArgs) {
        if (consolePresent) {
          os.print(" " + ITALICS + arg + COLOR_RESET);
        } else {
          os.print(" <" + arg + ">");
        }
      }
    }
    if (commandSpec.optionalArgs != null) {
      for (Arg arg : commandSpec.optionalArgs) {
        if (consolePresent) {
          os.print(" [" + ITALICS + arg + COLOR_RESET);
        } else {
          os.print(" [<" + arg + ">");
        }
        if (arg instanceof VarArg) {
          os.print(" ...");
        }
        os.print("]");
      }
    }
    os.println("\n\t" + commandSpec.description);
    if (commandSpec instanceof OptionsCommandSpec) {
      OptionsCommandSpec ocs = (OptionsCommandSpec)commandSpec;
      if (ocs.options != null) {
        os.println("\tOptions:");
        HelpFormatter formatter = new HelpFormatter();
        PrintWriter pw = new PrintWriter(os);
        formatter.printOptions(pw, 80, ocs.options, 12, 4);
        pw.flush();
      }
    }
  }

  private void printHelp(Map<String, CommandSpec> commandSpecs, PrintStream os)
  {
    for (Map.Entry<String, CommandSpec> entry : commandSpecs.entrySet()) {
      printHelp(entry.getKey(), entry.getValue(), os);
    }
  }

  private String readLine(ConsoleReader reader)
    throws IOException
  {
    if (forcePrompt == null) {
      prompt = "";
      if (consolePresent) {
        if (changingLogicalPlan) {
          prompt = "logical-plan-change";
        } else {
          prompt = "apex";
        }
        if (currentApp != null) {
          prompt += " (";
          prompt += currentApp.getApplicationId().toString();
          prompt += ") ";
        }
        prompt += "> ";
      }
    } else {
      prompt = forcePrompt;
    }
    String line = reader.readLine(prompt, consolePresent ? null : (char)0);
    if (line == null) {
      return null;
    }
    return ltrim(line);
  }

  private List<ApplicationReport> getApplicationList()
  {
    try {
      return StramUtils.getApexApplicationList(yarnClient);
    } catch (Exception e) {
      throw new CliException("Error getting application list from resource manager", e);
    }
  }

  private String getContainerLongId(String containerId)
  {
    JSONObject json = getResource(StramWebServices.PATH_PHYSICAL_PLAN_CONTAINERS, currentApp);
    int shortId = 0;
    if (StringUtils.isNumeric(containerId)) {
      shortId = Integer.parseInt(containerId);
    }
    try {
      Object containersObj = json.get("containers");
      JSONArray containers;
      if (containersObj instanceof JSONArray) {
        containers = (JSONArray)containersObj;
      } else {
        containers = new JSONArray();
        containers.put(containersObj);
      }
      if (containersObj != null) {
        for (int o = containers.length(); o-- > 0; ) {
          JSONObject container = containers.getJSONObject(o);
          String id = container.getString("id");
          if (id.equals(containerId) || (shortId != 0 && (id.endsWith("_" + shortId) || id.endsWith("0" + shortId)))) {
            return id;
          }
        }
      }
    } catch (JSONException ex) {
      // ignore
    }
    return null;
  }

  private ApplicationReport assertRunningApp(ApplicationReport app)
  {
    ApplicationReport r;
    try {
      r = yarnClient.getApplicationReport(app.getApplicationId());
      if (r.getYarnApplicationState() != YarnApplicationState.RUNNING) {
        String msg = String.format("Application %s not running (status %s)",
            r.getApplicationId().getId(), r.getYarnApplicationState());
        throw new CliException(msg);
      }
    } catch (YarnException rmExc) {
      throw new CliException("Unable to determine application status", rmExc);
    } catch (IOException rmExc) {
      throw new CliException("Unable to determine application status", rmExc);
    }
    return r;
  }

  private JSONObject getResource(String resourcePath, ApplicationReport appReport)
  {
    return getResource(new StramAgent.StramUriSpec().path(resourcePath), appReport, new WebServicesClient.GetWebServicesHandler<JSONObject>());
  }

  private JSONObject getResource(StramAgent.StramUriSpec uriSpec, ApplicationReport appReport)
  {
    return getResource(uriSpec, appReport, new WebServicesClient.GetWebServicesHandler<JSONObject>());
  }

  private JSONObject getResource(StramAgent.StramUriSpec uriSpec, ApplicationReport appReport, WebServicesClient.WebServicesHandler handler)
  {

    if (appReport == null) {
      throw new CliException("No application selected");
    }

    if (StringUtils.isEmpty(appReport.getTrackingUrl()) || appReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
      appReport = null;
      throw new CliException("Application terminated");
    }

    WebServicesClient wsClient = new WebServicesClient();
    try {
      return stramAgent.issueStramWebRequest(wsClient, appReport.getApplicationId().toString(), uriSpec, handler);
    } catch (Exception e) {
      // check the application status as above may have failed due application termination etc.
      if (appReport == currentApp) {
        currentApp = assertRunningApp(appReport);
      }
      throw new CliException("Failed to request web service for appid " + appReport.getApplicationId().toString(), e);
    }
  }

  private List<AppFactory> getMatchingAppFactories(StramAppLauncher submitApp, String matchString, boolean exactMatch)
  {
    try {
      List<AppFactory> cfgList = submitApp.getBundledTopologies();

      if (cfgList.isEmpty()) {
        return null;
      } else if (matchString == null) {
        return cfgList;
      } else {
        List<AppFactory> result = new ArrayList<>();
        if (!exactMatch) {
          matchString = matchString.toLowerCase();
        }
        for (AppFactory ac : cfgList) {
          String appName = ac.getName();
          String appAlias = submitApp.getLogicalPlanConfiguration().getAppAlias(appName);
          if (exactMatch) {
            if (matchString.equals(appName) || matchString.equals(appAlias)) {
              result.add(ac);
            }
          } else if (appName.toLowerCase().contains(matchString) || (appAlias != null && appAlias.toLowerCase()
              .contains(matchString))) {
            result.add(ac);
          }
        }
        return result;
      }
    } catch (Exception ex) {
      LOG.warn("Caught Exception: ", ex);
      return null;
    }
  }

  /*
   * Below is the implementation of all commands
   */
  private class HelpCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      PrintStream os = getOutputPrintStream();
      if (args.length < 2) {
        os.println("GLOBAL COMMANDS EXCEPT WHEN CHANGING LOGICAL PLAN:\n");
        printHelp(globalCommands, os);
        os.println();
        os.println("COMMANDS WHEN CONNECTED TO AN APP (via connect <appid>) EXCEPT WHEN CHANGING LOGICAL PLAN:\n");
        printHelp(connectedCommands, os);
        os.println();
        os.println("COMMANDS WHEN CHANGING LOGICAL PLAN (via begin-logical-plan-change):\n");
        printHelp(logicalPlanChangeCommands, os);
        os.println();
      } else {
        if (args[1].equals("help")) {
          printHelp("help", globalCommands.get("help"), os);
        } else {
          boolean valid = false;
          CommandSpec cs = globalCommands.get(args[1]);
          if (cs != null) {
            os.println("This usage is valid except when changing logical plan");
            printHelp(args[1], cs, os);
            os.println();
            valid = true;
          }
          cs = connectedCommands.get(args[1]);
          if (cs != null) {
            os.println("This usage is valid when connected to an app except when changing logical plan");
            printHelp(args[1], cs, os);
            os.println();
            valid = true;
          }
          cs = logicalPlanChangeCommands.get(args[1]);
          if (cs != null) {
            os.println("This usage is only valid when changing logical plan (via begin-logical-plan-change)");
            printHelp(args[1], cs, os);
            os.println();
            valid = true;
          }
          if (!valid) {
            os.println("Help for \"" + args[1] + "\" does not exist.");
          }
        }
      }
      closeOutputPrintStream(os);
    }

  }

  private class EchoCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      for (int i = 1; i < args.length; i++) {
        if (i > 1) {
          System.out.print(" ");
        }
        System.out.print(args[i]);
      }
      System.out.println();
    }
  }

  private class ConnectCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      currentApp = getApplication(args[1]);
      if (currentApp == null) {
        throw new CliException("Streaming application with id " + args[1] + " is not found.");
      }
      LOG.debug("Selected {} with tracking url {}", currentApp.getApplicationId(), currentApp.getTrackingUrl());
      getResource(StramWebServices.PATH_INFO, currentApp);
      if (consolePresent) {
        System.out.println("Connected to application " + currentApp.getApplicationId());
      }
    }

  }

  private class LaunchCommand implements Command
  {
    @Override
    @SuppressWarnings("SleepWhileInLoop")
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      String[] newArgs = new String[args.length - 1];
      System.arraycopy(args, 1, newArgs, 0, args.length - 1);
      LaunchCommandLineInfo commandLineInfo = getLaunchCommandLineInfo(newArgs);

      if (commandLineInfo.configFile != null) {
        commandLineInfo.configFile = expandFileName(commandLineInfo.configFile, true);
      }

      // see if the given config file is a config package
      ConfigPackage cp = null;
      String requiredAppPackageName = null;
      try {
        cp = new ConfigPackage(new File(commandLineInfo.configFile));
        requiredAppPackageName = cp.getAppPackageName();
      } catch (Exception ex) {
        // fall through, it's not a config package
      }
      try {
        Configuration config;
        String configFile = cp == null ? commandLineInfo.configFile : null;
        try {
          config = StramAppLauncher.getOverriddenConfig(StramClientUtils.addDTSiteResources(new Configuration()), configFile, commandLineInfo.overrideProperties);
          if (commandLineInfo.libjars != null) {
            commandLineInfo.libjars = expandCommaSeparatedFiles(commandLineInfo.libjars);
            if (commandLineInfo.libjars != null) {
              config.set(StramAppLauncher.LIBJARS_CONF_KEY_NAME, commandLineInfo.libjars);
            }
          }
          if (commandLineInfo.files != null) {
            commandLineInfo.files = expandCommaSeparatedFiles(commandLineInfo.files);
            if (commandLineInfo.files != null) {
              config.set(StramAppLauncher.FILES_CONF_KEY_NAME, commandLineInfo.files);
            }
          }
          if (commandLineInfo.archives != null) {
            commandLineInfo.archives = expandCommaSeparatedFiles(commandLineInfo.archives);
            if (commandLineInfo.archives != null) {
              config.set(StramAppLauncher.ARCHIVES_CONF_KEY_NAME, commandLineInfo.archives);
            }
          }
          if (commandLineInfo.origAppId != null) {
            config.set(StramAppLauncher.ORIGINAL_APP_ID, commandLineInfo.origAppId);
          }
          config.set(StramAppLauncher.QUEUE_NAME, commandLineInfo.queue != null ? commandLineInfo.queue : "default");
          if (commandLineInfo.tags != null) {
            config.set(StramAppLauncher.TAGS, commandLineInfo.tags);
          }
        } catch (Exception ex) {
          throw new CliException("Error opening the config XML file: " + configFile, ex);
        }
        StramAppLauncher submitApp;
        AppFactory appFactory = null;
        String matchString = null;
        if (commandLineInfo.args.length == 0) {
          if (commandLineInfo.origAppId == null) {
            throw new CliException("Launch requires an APA or JAR file when not resuming a terminated application");
          }
          submitApp = new StramAppLauncher(fs, config);
          appFactory = submitApp.new RecoveryAppFactory();
        } else {
          String fileName = expandFileName(commandLineInfo.args[0], true);
          if (commandLineInfo.args.length >= 2) {
            matchString = commandLineInfo.args[1];
          }
          if (fileName.endsWith(".json")) {
            File file = new File(fileName);
            submitApp = new StramAppLauncher(file.getName(), config);
            appFactory = new StramAppLauncher.JsonFileAppFactory(file);
            if (matchString != null) {
              LOG.warn("Match string \"{}\" is ignored for launching applications specified in JSON", matchString);
            }
          } else if (fileName.endsWith(".properties")) {
            File file = new File(fileName);
            submitApp = new StramAppLauncher(file.getName(), config);
            appFactory = new StramAppLauncher.PropertyFileAppFactory(file);
            if (matchString != null) {
              LOG.warn("Match string \"{}\" is ignored for launching applications specified in properties file", matchString);
            }
          } else {
            // see if it's an app package
            AppPackage ap = null;
            try {
              ap = newAppPackageInstance(new URI(fileName), true);
            } catch (Exception ex) {
              // It's not an app package
              if (requiredAppPackageName != null) {
                throw new CliException("Config package requires an app package name of \"" + requiredAppPackageName + "\"");
              }
            }

            if (ap != null) {
              try {
                if (!commandLineInfo.force) {
                  checkPlatformCompatible(ap);
                  checkConfigPackageCompatible(ap, cp);
                }
                launchAppPackage(ap, cp, commandLineInfo, reader);
                return;
              } finally {
                IOUtils.closeQuietly(ap);
              }
            }
            submitApp = getStramAppLauncher(fileName, config, commandLineInfo.ignorePom);
          }
        }
        submitApp.loadDependencies();

        if (commandLineInfo.origAppId != null) {
          // ensure app is not running
          ApplicationReport ar = null;
          try {
            ar = getApplication(commandLineInfo.origAppId);
          } catch (Exception e) {
            // application (no longer) in the RM history, does not prevent restart from state in DFS
            LOG.debug("Cannot determine status of application {} {}", commandLineInfo.origAppId, ExceptionUtils.getMessage(e));
          }
          if (ar != null) {
            if (ar.getFinalApplicationStatus() == FinalApplicationStatus.UNDEFINED) {
              throw new CliException("Cannot relaunch non-terminated application: " + commandLineInfo.origAppId + " " + ar.getYarnApplicationState());
            }
            if (appFactory == null && matchString == null) {
              // skip selection if we can match application name from previous run
              List<AppFactory> matchingAppFactories = getMatchingAppFactories(submitApp, ar.getName(), commandLineInfo.exactMatch);
              for (AppFactory af : matchingAppFactories) {
                String appName = submitApp.getLogicalPlanConfiguration().getAppAlias(af.getName());
                if (appName == null) {
                  appName = af.getName();
                }
                // limit to exact match
                if (appName.equals(ar.getName())) {
                  appFactory = af;
                  break;
                }
              }
            }
          }
        }

        if (appFactory == null && matchString != null) {
          // attempt to interpret argument as property file - do we still need it?
          try {
            File file = new File(expandFileName(commandLineInfo.args[1], true));
            if (file.exists()) {
              if (commandLineInfo.args[1].endsWith(".properties")) {
                appFactory = new StramAppLauncher.PropertyFileAppFactory(file);
              } else if (commandLineInfo.args[1].endsWith(".json")) {
                appFactory = new StramAppLauncher.JsonFileAppFactory(file);
              }
            }
          } catch (Exception | NoClassDefFoundError ex) {
            // ignore
          }
        }

        if (appFactory == null) {
          List<AppFactory> matchingAppFactories = getMatchingAppFactories(submitApp, matchString, commandLineInfo.exactMatch);
          if (matchingAppFactories == null || matchingAppFactories.isEmpty()) {
            throw new CliException("No applications matching \"" + matchString + "\" bundled in jar.");
          } else if (matchingAppFactories.size() == 1) {
            appFactory = matchingAppFactories.get(0);
          } else if (matchingAppFactories.size() > 1) {

            //Store the appNames sorted in alphabetical order and their position in matchingAppFactories list
            TreeMap<String, Integer> appNamesInAlphabeticalOrder = new TreeMap<>();
            // Display matching applications
            for (int i = 0; i < matchingAppFactories.size(); i++) {
              String appName = matchingAppFactories.get(i).getName();
              String appAlias = submitApp.getLogicalPlanConfiguration().getAppAlias(appName);
              if (appAlias != null) {
                appName = appAlias;
              }
              appNamesInAlphabeticalOrder.put(appName, i);
            }

            //Create a mapping between the app display number and original index at matchingAppFactories
            int index = 1;
            HashMap<Integer, Integer> displayIndexToOriginalUnsortedIndexMap = new HashMap<>();
            for (Map.Entry<String, Integer> entry : appNamesInAlphabeticalOrder.entrySet()) {
              //Map display number of the app to original unsorted index
              displayIndexToOriginalUnsortedIndexMap.put(index, entry.getValue());

              //Display the app names
              System.out.printf("%3d. %s\n", index++, entry.getKey());
            }

            // Exit if not in interactive mode
            if (!consolePresent) {
              throw new CliException("More than one application in jar file match '" + matchString + "'");
            } else {

              boolean useHistory = reader.isHistoryEnabled();
              reader.setHistoryEnabled(false);
              History previousHistory = reader.getHistory();
              History dummyHistory = new MemoryHistory();
              reader.setHistory(dummyHistory);
              List<Completer> completers = new ArrayList<>(reader.getCompleters());
              for (Completer c : completers) {
                reader.removeCompleter(c);
              }
              reader.setHandleUserInterrupt(true);
              String optionLine;
              try {
                optionLine = reader.readLine("Choose application: ");
              } finally {
                reader.setHandleUserInterrupt(false);
                reader.setHistoryEnabled(useHistory);
                reader.setHistory(previousHistory);
                for (Completer c : completers) {
                  reader.addCompleter(c);
                }
              }
              try {
                int option = Integer.parseInt(optionLine);
                if (0 < option && option <= matchingAppFactories.size()) {
                  int appIndex = displayIndexToOriginalUnsortedIndexMap.get(option);
                  appFactory = matchingAppFactories.get(appIndex);
                }
              } catch (Exception ex) {
                // ignore
              }
            }
          }

        }

        if (appFactory != null) {
          if (!commandLineInfo.localMode) {
            // see whether there is an app with the same name and user name running
            String appName = config.get(LogicalPlanConfiguration.KEY_APPLICATION_NAME, appFactory.getName());
            ApplicationReport duplicateApp = StramClientUtils.getStartedAppInstanceByName(yarnClient, appName, UserGroupInformation.getLoginUser().getUserName(), null);
            if (duplicateApp != null) {
              throw new CliException("Application with the name \"" + duplicateApp.getName() + "\" already running under the current user \"" + duplicateApp.getUser() + "\". Please choose another name. You can change the name by setting " + LogicalPlanConfiguration.KEY_APPLICATION_NAME);
            }

            // This is for suppressing System.out printouts from applications so that the user of CLI will not be confused by those printouts
            PrintStream originalStream = suppressOutput();
            ApplicationId appId = null;
            try {
              if (raw) {
                PrintStream dummyStream = new PrintStream(new OutputStream()
                {
                  @Override
                  public void write(int b)
                  {
                    // no-op
                  }

                });
                System.setOut(dummyStream);
              }
              appId = submitApp.launchApp(appFactory);
              currentApp = yarnClient.getApplicationReport(appId);
            } finally {
              restoreOutput(originalStream);
            }
            if (appId != null) {
              printJson("{\"appId\": \"" + appId + "\"}");
            }
          } else {
            submitApp.runLocal(appFactory);
          }
        } else {
          System.err.println("No application specified.");
        }

      } finally {
        IOUtils.closeQuietly(cp);
      }
    }

  }

  private class GetConfigParameterCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      PrintStream os = getOutputPrintStream();
      if (args.length == 1) {
        Map<String, String> sortedMap = new TreeMap<>();
        for (Map.Entry<String, String> entry : conf) {
          sortedMap.put(entry.getKey(), entry.getValue());
        }
        for (Map.Entry<String, String> entry : sortedMap.entrySet()) {
          os.println(entry.getKey() + "=" + entry.getValue());
        }
      } else {
        String value = conf.get(args[1]);
        if (value != null) {
          os.println(value);
        }
      }
      closeOutputPrintStream(os);
    }

  }

  private class ShutdownAppCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      ApplicationReport[] apps;
      if (args.length == 1) {
        if (currentApp == null) {
          throw new CliException("No application selected");
        } else {
          apps = new ApplicationReport[]{currentApp};
        }
      } else {
        apps = new ApplicationReport[args.length - 1];
        for (int i = 1; i < args.length; i++) {
          apps[i - 1] = getApplication(args[i]);
          if (apps[i - 1] == null) {
            throw new CliException("Streaming application with id " + args[i] + " is not found.");
          }
        }
      }

      for (ApplicationReport app : apps) {
        try {
          JSONObject response = getResource(new StramAgent.StramUriSpec().path(StramWebServices.PATH_SHUTDOWN), app, new WebServicesClient.WebServicesHandler<JSONObject>()
          {
            @Override
            public JSONObject process(WebResource.Builder webResource, Class<JSONObject> clazz)
            {
              return webResource.accept(MediaType.APPLICATION_JSON).post(clazz, new JSONObject());
            }

          });
          if (consolePresent) {
            System.out.println("Shutdown requested: " + response);
          }
          currentApp = null;
        } catch (Exception e) {
          throw new CliException("Failed to request shutdown for appid " + app.getApplicationId().toString(), e);
        }
      }
    }

  }

  private class ListAppsCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      try {
        JSONArray jsonArray = new JSONArray();
        List<ApplicationReport> appList = getApplicationList();
        Collections.sort(appList, new Comparator<ApplicationReport>()
        {
          @Override
          public int compare(ApplicationReport o1, ApplicationReport o2)
          {
            return o1.getApplicationId().getId() - o2.getApplicationId().getId();
          }

        });
        int totalCnt = 0;
        int runningCnt = 0;

        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss Z");

        for (ApplicationReport ar : appList) {
          /*
           * This is inefficient, but what the heck, if this can be passed through the command line, can anyone notice slowness.
           */
          JSONObject jsonObj = new JSONObject();
          jsonObj.put("startTime", sdf.format(new java.util.Date(ar.getStartTime())));
          jsonObj.put("id", ar.getApplicationId().getId());
          jsonObj.put("name", ar.getName());
          jsonObj.put("state", ar.getYarnApplicationState().name());
          jsonObj.put("trackingUrl", ar.getTrackingUrl());
          jsonObj.put("finalStatus", ar.getFinalApplicationStatus());
          JSONArray tags = new JSONArray();
          for (String tag : ar.getApplicationTags()) {
            tags.put(tag);
          }
          jsonObj.put("tags", tags);

          totalCnt++;
          if (ar.getYarnApplicationState() == YarnApplicationState.RUNNING) {
            runningCnt++;
          }

          if (args.length > 1) {
            if (StringUtils.isNumeric(args[1])) {
              if (jsonObj.getString("id").equals(args[1])) {
                jsonArray.put(jsonObj);
                break;
              }
            } else {
              @SuppressWarnings("unchecked")
              Iterator<String> keys = jsonObj.keys();
              while (keys.hasNext()) {
                if (jsonObj.get(keys.next()).toString().toLowerCase().contains(args[1].toLowerCase())) {
                  jsonArray.put(jsonObj);
                  break;
                }
              }
            }
          } else {
            jsonArray.put(jsonObj);
          }
        }
        printJson(jsonArray, "apps");
        if (consolePresent) {
          System.out.println(runningCnt + " active, total " + totalCnt + " applications.");
        }
      } catch (Exception ex) {
        throw new CliException("Failed to retrieve application list", ex);
      }
    }

  }

  private class KillAppCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      if (args.length == 1) {
        if (currentApp == null) {
          throw new CliException("No application selected");
        } else {
          try {
            yarnClient.killApplication(currentApp.getApplicationId());
            currentApp = null;
          } catch (YarnException e) {
            throw new CliException("Failed to kill " + currentApp.getApplicationId(), e);
          }
        }
        if (consolePresent) {
          System.out.println("Kill app requested");
        }
        return;
      }

      ApplicationReport app = null;
      int i = 0;
      try {
        while (++i < args.length) {
          app = getApplication(args[i]);
          if (app == null) {

            /*
             * try once again with application name type.
             */
            app = getApplicationByName(args[i]);
            if (app == null) {
              throw new CliException("Streaming application with id or name " + args[i] + " is not found.");
            }
          }
          yarnClient.killApplication(app.getApplicationId());
          if (app == currentApp) {
            currentApp = null;
          }
        }
        if (consolePresent) {
          System.out.println("Kill app requested");
        }
      } catch (YarnException e) {
        throw new CliException("Failed to kill " + ((app == null || app.getApplicationId() == null) ? "unknown application" : app.getApplicationId()) + ". Aborting killing of any additional applications.", e);
      } catch (NumberFormatException nfe) {
        throw new CliException("Invalid application Id " + args[i], nfe);
      }
    }

  }

  private class AliasCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      if (args[1].equals(args[2])) {
        throw new CliException("Alias to itself!");
      }
      aliases.put(args[1], args[2]);
      if (consolePresent) {
        System.out.println("Alias " + args[1] + " created.");
      }
      updateCompleter(reader);
    }

  }

  private class SourceCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      processSourceFile(args[1], reader);
      if (consolePresent) {
        System.out.println("File " + args[1] + " sourced.");
      }
    }

  }

  private class ExitCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      if (topLevelHistory != null) {
        try {
          topLevelHistory.flush();
        } catch (IOException ex) {
          LOG.warn("Cannot flush command history");
        }
      }
      if (changingLogicalPlanHistory != null) {
        try {
          changingLogicalPlanHistory.flush();
        } catch (IOException ex) {
          LOG.warn("Cannot flush command history");
        }
      }
      System.exit(0);
    }

  }

  private class ListContainersCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      JSONObject json = getResource(StramWebServices.PATH_PHYSICAL_PLAN_CONTAINERS, currentApp);
      if (args.length == 1) {
        printJson(json);
      } else {
        Object containersObj = json.get("containers");
        JSONArray containers;
        if (containersObj instanceof JSONArray) {
          containers = (JSONArray)containersObj;
        } else {
          containers = new JSONArray();
          containers.put(containersObj);
        }
        if (containersObj == null) {
          System.out.println("No containers found!");
        } else {
          JSONArray resultContainers = new JSONArray();
          for (int o = containers.length(); o-- > 0; ) {
            JSONObject container = containers.getJSONObject(o);
            String id = container.getString("id");
            if (id != null && !id.isEmpty()) {
              for (int argc = args.length; argc-- > 1; ) {
                String s1 = "0" + args[argc];
                String s2 = "_" + args[argc];
                if (id.equals(args[argc]) || id.endsWith(s1) || id.endsWith(s2)) {
                  resultContainers.put(container);
                }
              }
            }
          }
          printJson(resultContainers, "containers");
        }
      }
    }

  }

  private class ListOperatorsCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      JSONObject json = getResource(StramWebServices.PATH_PHYSICAL_PLAN_OPERATORS, currentApp);

      if (args.length > 1) {
        String singleKey = "" + json.keys().next();
        JSONArray matches = new JSONArray();
        // filter operators
        JSONArray arr;
        Object obj = json.get(singleKey);
        if (obj instanceof JSONArray) {
          arr = (JSONArray)obj;
        } else {
          arr = new JSONArray();
          arr.put(obj);
        }
        for (int i = 0; i < arr.length(); i++) {
          JSONObject oper = arr.getJSONObject(i);
          if (StringUtils.isNumeric(args[1])) {
            if (oper.getString("id").equals(args[1])) {
              matches.put(oper);
              break;
            }
          } else {
            @SuppressWarnings("unchecked")
            Iterator<String> keys = oper.keys();
            while (keys.hasNext()) {
              if (oper.get(keys.next()).toString().toLowerCase().contains(args[1].toLowerCase())) {
                matches.put(oper);
                break;
              }
            }
          }
        }
        json.put(singleKey, matches);
      }

      printJson(json);
    }

  }

  private class ShowPhysicalPlanCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      try {
        printJson(getResource(StramWebServices.PATH_PHYSICAL_PLAN, currentApp));
      } catch (Exception e) {
        throw new CliException("Failed web service request for appid " + currentApp.getApplicationId().toString(), e);
      }
    }

  }

  private class KillContainerCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      for (int i = 1; i < args.length; i++) {
        String containerLongId = getContainerLongId(args[i]);
        if (containerLongId == null) {
          throw new CliException("Container " + args[i] + " not found");
        }
        try {
          StramAgent.StramUriSpec uriSpec = new StramAgent.StramUriSpec();
          uriSpec = uriSpec.path(StramWebServices.PATH_PHYSICAL_PLAN_CONTAINERS).path(URLEncoder.encode(containerLongId, "UTF-8")).path("kill");
          JSONObject response = getResource(uriSpec, currentApp, new WebServicesClient.WebServicesHandler<JSONObject>()
          {
            @Override
            public JSONObject process(WebResource.Builder webResource, Class<JSONObject> clazz)
            {
              return webResource.accept(MediaType.APPLICATION_JSON).post(clazz, new JSONObject());
            }

          });
          if (consolePresent) {
            System.out.println("Kill container requested: " + response);
          }
        } catch (Exception e) {
          throw new CliException("Failed web service request for appid " + currentApp.getApplicationId().toString(), e);
        }
      }
    }

  }

  private class WaitCommand implements Command
  {
    @Override
    public void execute(String[] args, final ConsoleReader reader) throws Exception
    {
      if (currentApp == null) {
        throw new CliException("No application selected");
      }
      int timeout = Integer.valueOf(args[1]);

      ClientRMHelper.AppStatusCallback cb = new ClientRMHelper.AppStatusCallback()
      {
        @Override
        public boolean exitLoop(ApplicationReport report)
        {
          System.out.println("current status is: " + report.getYarnApplicationState());
          try {
            if (reader.getInput().available() > 0) {
              return true;
            }
          } catch (IOException e) {
            LOG.error("Error checking for input.", e);
          }
          return false;
        }

      };

      try {
        ClientRMHelper clientRMHelper = new ClientRMHelper(yarnClient, conf);
        boolean result = clientRMHelper.waitForCompletion(currentApp.getApplicationId(), cb, timeout * 1000);
        if (!result) {
          System.err.println("Application terminated unsuccessfully.");
        }
      } catch (YarnException e) {
        throw new CliException("Failed to kill " + currentApp.getApplicationId(), e);
      }
    }

  }

  private class StartRecordingCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      String opId = args[1];
      String port = null;
      long numWindows = 0;
      if (args.length >= 3) {
        port = args[2];
      }
      if (args.length >= 4) {
        numWindows = Long.valueOf(args[3]);
      }
      printJson(recordingsAgent.startRecording(currentApp.getApplicationId().toString(), opId, port, numWindows));
    }

  }

  private class StopRecordingCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      String opId = args[1];
      String port = null;
      if (args.length == 3) {
        port = args[2];
      }
      printJson(recordingsAgent.stopRecording(currentApp.getApplicationId().toString(), opId, port));
    }

  }

  private class GetRecordingInfoCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      if (args.length <= 1) {
        List<RecordingInfo> recordingInfo = recordingsAgent.getRecordingInfo(currentApp.getApplicationId().toString());
        printJson(recordingInfo, "recordings");
      } else if (args.length <= 2) {
        String opId = args[1];
        List<RecordingInfo> recordingInfo = recordingsAgent.getRecordingInfo(currentApp.getApplicationId().toString(), opId);
        printJson(recordingInfo, "recordings");
      } else {
        String opId = args[1];
        String id = args[2];
        RecordingInfo recordingInfo = recordingsAgent.getRecordingInfo(currentApp.getApplicationId().toString(), opId, id);
        printJson(new JSONObject(mapper.writeValueAsString(recordingInfo)));
      }
    }

  }

  private class GetAppAttributesCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      if (currentApp == null) {
        throw new CliException("No application selected");
      }
      StramAgent.StramUriSpec uriSpec = new StramAgent.StramUriSpec();
      uriSpec = uriSpec.path(StramWebServices.PATH_LOGICAL_PLAN).path("attributes");
      if (args.length > 1) {
        uriSpec = uriSpec.queryParam("attributeName", args[1]);
      }
      try {
        JSONObject response = getResource(uriSpec, currentApp);
        printJson(response);
      } catch (Exception e) {
        throw new CliException("Failed web service request for appid " + currentApp.getApplicationId().toString(), e);
      }
    }

  }

  private class GetOperatorAttributesCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      if (currentApp == null) {
        throw new CliException("No application selected");
      }
      StramAgent.StramUriSpec uriSpec = new StramAgent.StramUriSpec();
      uriSpec = uriSpec.path(StramWebServices.PATH_LOGICAL_PLAN_OPERATORS).path(URLEncoder.encode(args[1], "UTF-8")).path("attributes");
      if (args.length > 2) {
        uriSpec = uriSpec.queryParam("attributeName", args[2]);
      }
      try {
        JSONObject response = getResource(uriSpec, currentApp);
        printJson(response);
      } catch (Exception e) {
        throw new CliException("Failed web service request for appid " + currentApp.getApplicationId().toString(), e);
      }
    }

  }

  private class GetPortAttributesCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      if (currentApp == null) {
        throw new CliException("No application selected");
      }
      StramAgent.StramUriSpec uriSpec = new StramAgent.StramUriSpec();
      uriSpec = uriSpec.path(StramWebServices.PATH_LOGICAL_PLAN_OPERATORS).path(URLEncoder.encode(args[1], "UTF-8")).path("ports").path(URLEncoder.encode(args[2], "UTF-8")).path("attributes");
      if (args.length > 3) {
        uriSpec = uriSpec.queryParam("attributeName", args[3]);
      }
      try {
        JSONObject response = getResource(uriSpec, currentApp);
        printJson(response);
      } catch (Exception e) {
        throw new CliException("Failed web service request for appid " + currentApp.getApplicationId().toString(), e);
      }
    }

  }

  private class GetOperatorPropertiesCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      if (currentApp == null) {
        throw new CliException("No application selected");
      }
      StramAgent.StramUriSpec uriSpec = new StramAgent.StramUriSpec();
      uriSpec = uriSpec.path(StramWebServices.PATH_LOGICAL_PLAN_OPERATORS).path(URLEncoder.encode(args[1], "UTF-8")).path("properties");
      if (args.length > 2) {
        uriSpec = uriSpec.queryParam("propertyName", args[2]);
      }
      try {
        JSONObject response = getResource(uriSpec, currentApp);
        printJson(response);
      } catch (Exception e) {
        throw new CliException("Failed web service request for appid " + currentApp.getApplicationId().toString(), e);
      }
    }

  }

  private class GetPhysicalOperatorPropertiesCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      if (currentApp == null) {
        throw new CliException("No application selected");
      }
      if (!NumberUtils.isDigits(args[1])) {
        throw new CliException("Operator ID must be a number");
      }
      String[] newArgs = new String[args.length - 1];
      System.arraycopy(args, 1, newArgs, 0, args.length - 1);
      PosixParser parser = new PosixParser();
      CommandLine line = parser.parse(GET_PHYSICAL_PROPERTY_OPTIONS.options, newArgs);
      String waitTime = line.getOptionValue(GET_PHYSICAL_PROPERTY_OPTIONS.waitTime.getOpt());
      String propertyName = line.getOptionValue(GET_PHYSICAL_PROPERTY_OPTIONS.propertyName.getOpt());
      StramAgent.StramUriSpec uriSpec = new StramAgent.StramUriSpec();
      uriSpec = uriSpec.path(StramWebServices.PATH_PHYSICAL_PLAN_OPERATORS).path(args[1]).path("properties");
      if (propertyName != null) {
        uriSpec = uriSpec.queryParam("propertyName", propertyName);
      }
      if (waitTime != null) {
        uriSpec = uriSpec.queryParam("waitTime", waitTime);
      }

      try {
        JSONObject response = getResource(uriSpec, currentApp);
        printJson(response);
      } catch (Exception e) {
        throw new CliException("Failed web service request for appid " + currentApp.getApplicationId().toString(), e);
      }
    }

  }

  private class SetOperatorPropertyCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      if (currentApp == null) {
        throw new CliException("No application selected");
      }
      if (changingLogicalPlan) {
        String operatorName = args[1];
        String propertyName = args[2];
        String propertyValue = args[3];
        SetOperatorPropertyRequest request = new SetOperatorPropertyRequest();
        request.setOperatorName(operatorName);
        request.setPropertyName(propertyName);
        request.setPropertyValue(propertyValue);
        logicalPlanRequestQueue.add(request);
      } else {
        StramAgent.StramUriSpec uriSpec = new StramAgent.StramUriSpec();
        uriSpec = uriSpec.path(StramWebServices.PATH_LOGICAL_PLAN_OPERATORS).path(URLEncoder.encode(args[1], "UTF-8")).path("properties");
        final JSONObject request = new JSONObject();
        request.put(args[2], args[3]);
        JSONObject response = getResource(uriSpec, currentApp, new WebServicesClient.WebServicesHandler<JSONObject>()
        {
          @Override
          public JSONObject process(WebResource.Builder webResource, Class<JSONObject> clazz)
          {
            return webResource.accept(MediaType.APPLICATION_JSON).post(JSONObject.class, request);
          }

        });
        printJson(response);
      }
    }

  }

  private class SetPhysicalOperatorPropertyCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      if (currentApp == null) {
        throw new CliException("No application selected");
      }
      if (!NumberUtils.isDigits(args[1])) {
        throw new CliException("Operator ID must be a number");
      }
      StramAgent.StramUriSpec uriSpec = new StramAgent.StramUriSpec();
      uriSpec = uriSpec.path(StramWebServices.PATH_PHYSICAL_PLAN_OPERATORS).path(args[1]).path("properties");
      final JSONObject request = new JSONObject();
      request.put(args[2], args[3]);
      JSONObject response = getResource(uriSpec, currentApp, new WebServicesClient.WebServicesHandler<JSONObject>()
      {
        @Override
        public JSONObject process(WebResource.Builder webResource, Class<JSONObject> clazz)
        {
          return webResource.accept(MediaType.APPLICATION_JSON).post(JSONObject.class, request);
        }

      });
      printJson(response);

    }

  }

  private class BeginLogicalPlanChangeCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      changingLogicalPlan = true;
      reader.setHistory(changingLogicalPlanHistory);
    }

  }

  private class ShowLogicalPlanCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      String[] newArgs = new String[args.length - 1];
      System.arraycopy(args, 1, newArgs, 0, args.length - 1);
      ShowLogicalPlanCommandLineInfo commandLineInfo = getShowLogicalPlanCommandLineInfo(newArgs);
      Configuration config = StramClientUtils.addDTSiteResources(new Configuration());
      if (commandLineInfo.libjars != null) {
        commandLineInfo.libjars = expandCommaSeparatedFiles(commandLineInfo.libjars);
        if (commandLineInfo.libjars != null) {
          config.set(StramAppLauncher.LIBJARS_CONF_KEY_NAME, commandLineInfo.libjars);
        }
      }

      if (commandLineInfo.args.length > 0) {
        // see if the first argument is actually an app package
        try (AppPackage ap = newAppPackageInstance(new URI(commandLineInfo.args[0]), false)) {
          new ShowLogicalPlanAppPackageCommand().execute(args, reader);
          return;
        } catch (Exception ex) {
          // fall through
        }

        String filename = expandFileName(commandLineInfo.args[0], true);
        if (commandLineInfo.args.length >= 2) {
          String appName = commandLineInfo.args[1];
          StramAppLauncher submitApp = getStramAppLauncher(filename, config, commandLineInfo.ignorePom);
          submitApp.loadDependencies();
          List<AppFactory> matchingAppFactories = getMatchingAppFactories(submitApp, appName, commandLineInfo.exactMatch);
          if (matchingAppFactories == null || matchingAppFactories.isEmpty()) {
            throw new CliException("No application in jar file matches '" + appName + "'");
          } else if (matchingAppFactories.size() > 1) {
            throw new CliException("More than one application in jar file match '" + appName + "'");
          } else {
            Map<String, Object> map = new HashMap<>();
            PrintStream originalStream = System.out;
            AppFactory appFactory = matchingAppFactories.get(0);
            try {
              if (raw) {
                PrintStream dummyStream = new PrintStream(new OutputStream()
                {
                  @Override
                  public void write(int b)
                  {
                    // no-op
                  }

                });
                System.setOut(dummyStream);
              }
              LogicalPlan logicalPlan = appFactory.createApp(submitApp.getLogicalPlanConfiguration());
              map.put("applicationName", appFactory.getName());
              map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan, false));
            } finally {
              if (raw) {
                System.setOut(originalStream);
              }
            }
            printJson(map);
          }
        } else {
          if (filename.endsWith(".json")) {
            File file = new File(filename);
            StramAppLauncher submitApp = new StramAppLauncher(file.getName(), config);
            AppFactory appFactory = new StramAppLauncher.JsonFileAppFactory(file);
            LogicalPlan logicalPlan = appFactory.createApp(submitApp.getLogicalPlanConfiguration());
            Map<String, Object> map = new HashMap<>();
            map.put("applicationName", appFactory.getName());
            map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan, false));
            printJson(map);
          } else if (filename.endsWith(".properties")) {
            File file = new File(filename);
            StramAppLauncher submitApp = new StramAppLauncher(file.getName(), config);
            AppFactory appFactory = new StramAppLauncher.PropertyFileAppFactory(file);
            LogicalPlan logicalPlan = appFactory.createApp(submitApp.getLogicalPlanConfiguration());
            Map<String, Object> map = new HashMap<>();
            map.put("applicationName", appFactory.getName());
            map.put("logicalPlan", LogicalPlanSerializer.convertToMap(logicalPlan, false));
            printJson(map);
          } else {
            StramAppLauncher submitApp = getStramAppLauncher(filename, config, commandLineInfo.ignorePom);
            submitApp.loadDependencies();
            List<Map<String, Object>> appList = new ArrayList<>();
            List<AppFactory> appFactoryList = submitApp.getBundledTopologies();
            for (AppFactory appFactory : appFactoryList) {
              Map<String, Object> m = new HashMap<>();
              m.put("name", appFactory.getName());
              appList.add(m);
            }
            printJson(appList, "applications");
          }
        }
      } else {
        if (currentApp == null) {
          throw new CliException("No application selected");
        }
        JSONObject response = getResource(StramWebServices.PATH_LOGICAL_PLAN, currentApp);
        printJson(response);
      }
    }

  }

  private class ShowLogicalPlanAppPackageCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      try (AppPackage ap = newAppPackageInstance(new URI(args[1]), true)) {
        List<AppInfo> applications = ap.getApplications();

        if (args.length >= 3) {
          for (AppInfo appInfo : applications) {
            if (args[2].equals(appInfo.name)) {
              Map<String, Object> map = new HashMap<>();
              map.put("applicationName", appInfo.name);
              if (appInfo.dag != null) {
                map.put("logicalPlan", LogicalPlanSerializer.convertToMap(appInfo.dag, false));
              }
              if (appInfo.error != null) {
                map.put("error", appInfo.error);
              }
              printJson(map);
            }
          }
        } else {
          List<Map<String, Object>> appList = new ArrayList<>();
          for (AppInfo appInfo : applications) {
            Map<String, Object> m = new HashMap<>();
            m.put("name", appInfo.name);
            m.put("type", appInfo.type);
            appList.add(m);
          }
          printJson(appList, "applications");
        }
      }
    }

  }

  private File copyToLocal(String[] files) throws IOException
  {
    File tmpDir = new File(System.getProperty("java.io.tmpdir") + "/datatorrent/" + ManagementFactory.getRuntimeMXBean().getName());
    tmpDir.mkdirs();
    for (int i = 0; i < files.length; i++) {
      try {
        URI uri = new URI(files[i]);
        String scheme = uri.getScheme();
        if (scheme == null || scheme.equals("file")) {
          files[i] = uri.getPath();
        } else {
          try (FileSystem tmpFs = FileSystem.newInstance(uri, conf)) {
            Path srcPath = new Path(uri.getPath());
            Path dstPath = new Path(tmpDir.getAbsolutePath(), String.valueOf(i) + srcPath.getName());
            tmpFs.copyToLocalFile(srcPath, dstPath);
            files[i] = dstPath.toUri().getPath();
          }
        }
      } catch (URISyntaxException ex) {
        throw new RuntimeException(ex);
      }
    }

    return tmpDir;
  }

  private static Options GET_APP_PACKAGE_INFO_OPTIONS = new Options();

  static {
    GET_APP_PACKAGE_INFO_OPTIONS
        .addOption(new Option("withDescription", false, "Get default properties with description"));
  }

  public static class GetOperatorClassesCommandLineOptions
  {
    final Options options = new Options();
    final Option parent = add(new Option("parent", true, "Specify the parent class for the operators"));

    private Option add(Option opt)
    {
      this.options.addOption(opt);
      return opt;
    }

  }

  private static GetOperatorClassesCommandLineOptions GET_OPERATOR_CLASSES_OPTIONS = new GetOperatorClassesCommandLineOptions();

  static class GetAppPackageInfoCommandLineInfo
  {
    boolean provideDescription;
  }

  static GetAppPackageInfoCommandLineInfo getGetAppPackageInfoCommandLineInfo(String[] args) throws ParseException
  {
    CommandLineParser parser = new PosixParser();
    GetAppPackageInfoCommandLineInfo result = new GetAppPackageInfoCommandLineInfo();
    CommandLine line = parser.parse(GET_APP_PACKAGE_INFO_OPTIONS, args);
    result.provideDescription = line.hasOption("withDescription");
    return result;
  }

  static class GetOperatorClassesCommandLineInfo
  {
    String parent;
    String[] args;
  }

  static GetOperatorClassesCommandLineInfo getGetOperatorClassesCommandLineInfo(String[] args) throws ParseException
  {
    CommandLineParser parser = new PosixParser();
    GetOperatorClassesCommandLineInfo result = new GetOperatorClassesCommandLineInfo();
    CommandLine line = parser.parse(GET_OPERATOR_CLASSES_OPTIONS.options, args);
    result.parent = line.getOptionValue("parent");
    result.args = line.getArgs();
    return result;
  }

  private class GetJarOperatorClassesCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      String[] newArgs = new String[args.length - 1];
      System.arraycopy(args, 1, newArgs, 0, args.length - 1);
      GetOperatorClassesCommandLineInfo commandLineInfo = getGetOperatorClassesCommandLineInfo(newArgs);
      String parentName = commandLineInfo.parent != null ? commandLineInfo.parent : GenericOperator.class.getName();
      String files = expandCommaSeparatedFiles(commandLineInfo.args[0]);
      if (files == null) {
        throw new CliException("File " + commandLineInfo.args[0] + " is not found");
      }
      String[] jarFiles = files.split(",");
      File tmpDir = copyToLocal(jarFiles);
      try {
        OperatorDiscoverer operatorDiscoverer = new OperatorDiscoverer(jarFiles);
        String searchTerm = commandLineInfo.args.length > 1 ? commandLineInfo.args[1] : null;
        Set<String> operatorClasses = operatorDiscoverer.getOperatorClasses(parentName, searchTerm);
        JSONObject json = new JSONObject();
        JSONArray arr = new JSONArray();
        JSONObject portClassHier = new JSONObject();
        JSONObject portTypesWithSchemaClasses = new JSONObject();

        JSONObject failed = new JSONObject();

        for (final String clazz : operatorClasses) {
          try {
            JSONObject oper = operatorDiscoverer.describeOperator(clazz);

            // add default value
            operatorDiscoverer.addDefaultValue(clazz, oper);

            // add class hierarchy info to portClassHier and fetch port types with schema classes
            operatorDiscoverer.buildAdditionalPortInfo(oper, portClassHier, portTypesWithSchemaClasses);

            Iterator portTypesIter = portTypesWithSchemaClasses.keys();
            while (portTypesIter.hasNext()) {
              if (!portTypesWithSchemaClasses.getBoolean((String)portTypesIter.next())) {
                portTypesIter.remove();
              }
            }

            arr.put(oper);
          } catch (Exception | NoClassDefFoundError ex) {
            // ignore this class
            final String cls = clazz;
            failed.put(cls, ex.toString());
          }
        }

        json.put("operatorClasses", arr);
        json.put("portClassHier", portClassHier);
        json.put("portTypesWithSchemaClasses", portTypesWithSchemaClasses);
        if (failed.length() > 0) {
          json.put("failedOperators", failed);
        }
        printJson(json);
      } finally {
        FileUtils.deleteDirectory(tmpDir);
      }
    }
  }

  private class GetJarOperatorPropertiesCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      String files = expandCommaSeparatedFiles(args[1]);
      if (files == null) {
        throw new CliException("File " + args[1] + " is not found");
      }
      String[] jarFiles = files.split(",");
      File tmpDir = copyToLocal(jarFiles);
      try {
        OperatorDiscoverer operatorDiscoverer = new OperatorDiscoverer(jarFiles);
        Class<? extends Operator> operatorClass = operatorDiscoverer.getOperatorClass(args[2]);
        printJson(operatorDiscoverer.describeOperator(operatorClass.getName()));
      } finally {
        FileUtils.deleteDirectory(tmpDir);
      }
    }

  }

  private class DumpPropertiesFileCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      String outfilename = expandFileName(args[1], false);

      if (args.length > 3) {
        String jarfile = args[2];
        String appName = args[3];
        Configuration config = StramClientUtils.addDTSiteResources(new Configuration());
        StramAppLauncher submitApp = getStramAppLauncher(jarfile, config, false);
        submitApp.loadDependencies();
        List<AppFactory> matchingAppFactories = getMatchingAppFactories(submitApp, appName, true);
        if (matchingAppFactories == null || matchingAppFactories.isEmpty()) {
          throw new CliException("No application in jar file matches '" + appName + "'");
        } else if (matchingAppFactories.size() > 1) {
          throw new CliException("More than one application in jar file match '" + appName + "'");
        } else {
          AppFactory appFactory = matchingAppFactories.get(0);
          LogicalPlan logicalPlan = appFactory.createApp(submitApp.getLogicalPlanConfiguration());
          File file = new File(outfilename);
          if (!file.exists()) {
            file.createNewFile();
          }
          LogicalPlanSerializer.convertToProperties(logicalPlan).save(file);
        }
      } else {
        if (currentApp == null) {
          throw new CliException("No application selected");
        }
        JSONObject response = getResource(StramWebServices.PATH_LOGICAL_PLAN, currentApp);
        File file = new File(outfilename);
        if (!file.exists()) {
          file.createNewFile();
        }
        LogicalPlanSerializer.convertToProperties(response).save(file);
      }
      System.out.println("Property file is saved at " + outfilename);
    }

  }

  private class CreateOperatorCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      String operatorName = args[1];
      String className = args[2];
      CreateOperatorRequest request = new CreateOperatorRequest();
      request.setOperatorName(operatorName);
      request.setOperatorFQCN(className);
      logicalPlanRequestQueue.add(request);
    }

  }

  private class RemoveOperatorCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      String operatorName = args[1];
      RemoveOperatorRequest request = new RemoveOperatorRequest();
      request.setOperatorName(operatorName);
      logicalPlanRequestQueue.add(request);
    }

  }

  private class CreateStreamCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      String streamName = args[1];
      String sourceOperatorName = args[2];
      String sourcePortName = args[3];
      String sinkOperatorName = args[4];
      String sinkPortName = args[5];
      CreateStreamRequest request = new CreateStreamRequest();
      request.setStreamName(streamName);
      request.setSourceOperatorName(sourceOperatorName);
      request.setSinkOperatorName(sinkOperatorName);
      request.setSourceOperatorPortName(sourcePortName);
      request.setSinkOperatorPortName(sinkPortName);
      logicalPlanRequestQueue.add(request);
    }

  }

  private class AddStreamSinkCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      String streamName = args[1];
      String sinkOperatorName = args[2];
      String sinkPortName = args[3];
      AddStreamSinkRequest request = new AddStreamSinkRequest();
      request.setStreamName(streamName);
      request.setSinkOperatorName(sinkOperatorName);
      request.setSinkOperatorPortName(sinkPortName);
      logicalPlanRequestQueue.add(request);
    }

  }

  private class RemoveStreamCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      String streamName = args[1];
      RemoveStreamRequest request = new RemoveStreamRequest();
      request.setStreamName(streamName);
      logicalPlanRequestQueue.add(request);
    }

  }

  private class SetOperatorAttributeCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      String operatorName = args[1];
      String attributeName = args[2];
      String attributeValue = args[3];
      SetOperatorAttributeRequest request = new SetOperatorAttributeRequest();
      request.setOperatorName(operatorName);
      request.setAttributeName(attributeName);
      request.setAttributeValue(attributeValue);
      logicalPlanRequestQueue.add(request);
    }

  }

  private class SetStreamAttributeCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      String streamName = args[1];
      String attributeName = args[2];
      String attributeValue = args[3];
      SetStreamAttributeRequest request = new SetStreamAttributeRequest();
      request.setStreamName(streamName);
      request.setAttributeName(attributeName);
      request.setAttributeValue(attributeValue);
      logicalPlanRequestQueue.add(request);
    }

  }

  private class SetPortAttributeCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      String operatorName = args[1];
      String attributeName = args[2];
      String attributeValue = args[3];
      SetPortAttributeRequest request = new SetPortAttributeRequest();
      request.setOperatorName(operatorName);
      request.setAttributeName(attributeName);
      request.setAttributeValue(attributeValue);
      logicalPlanRequestQueue.add(request);
    }

  }

  private class AbortCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      logicalPlanRequestQueue.clear();
      changingLogicalPlan = false;
      reader.setHistory(topLevelHistory);
    }

  }

  private class SubmitCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      if (logicalPlanRequestQueue.isEmpty()) {
        throw new CliException("Nothing to submit. Type \"abort\" to abort change");
      }
      StramAgent.StramUriSpec uriSpec = new StramAgent.StramUriSpec();
      uriSpec = uriSpec.path(StramWebServices.PATH_LOGICAL_PLAN);
      try {
        final Map<String, Object> m = new HashMap<>();
        ObjectMapper mapper = new ObjectMapper();
        m.put("requests", logicalPlanRequestQueue);
        final JSONObject jsonRequest = new JSONObject(mapper.writeValueAsString(m));

        JSONObject response = getResource(uriSpec, currentApp, new WebServicesClient.WebServicesHandler<JSONObject>()
        {
          @Override
          public JSONObject process(WebResource.Builder webResource, Class<JSONObject> clazz)
          {
            return webResource.accept(MediaType.APPLICATION_JSON).post(JSONObject.class, jsonRequest);
          }

        });
        printJson(response);
      } catch (Exception e) {
        throw new CliException("Failed web service request for appid " + currentApp.getApplicationId().toString(), e);
      }
      logicalPlanRequestQueue.clear();
      changingLogicalPlan = false;
      reader.setHistory(topLevelHistory);
    }

  }

  private class ShowQueueCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      printJson(logicalPlanRequestQueue, "queue");
      if (consolePresent) {
        System.out.println("Total operations in queue: " + logicalPlanRequestQueue.size());
      }
    }

  }

  private class BeginMacroCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      String name = args[1];
      if (macros.containsKey(name) || aliases.containsKey(name)) {
        System.err.println("Name '" + name + "' already exists.");
        return;
      }
      try {
        List<String> commands = new ArrayList<>();
        while (true) {
          String line;
          if (consolePresent) {
            line = reader.readLine("macro def (" + name + ") > ");
          } else {
            line = reader.readLine("", (char)0);
          }
          if (line.equals("end")) {
            macros.put(name, commands);
            updateCompleter(reader);
            if (consolePresent) {
              System.out.println("Macro '" + name + "' created.");
            }
            return;
          } else if (line.equals("abort")) {
            System.err.println("Aborted");
            return;
          } else {
            commands.add(line);
          }
        }
      } catch (IOException ex) {
        System.err.println("Aborted");
      }
    }

  }

  private class SetPagerCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      if (args[1].equals("off")) {
        pagerCommand = null;
      } else if (args[1].equals("on")) {
        if (consolePresent) {
          pagerCommand = "less -F -X -r";
        }
      } else {
        throw new CliException("set-pager parameter is either on or off.");
      }
    }

  }

  private class GetAppInfoCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      ApplicationReport appReport;
      if (args.length > 1) {
        appReport = getApplication(args[1]);
        if (appReport == null) {
          throw new CliException("Streaming application with id " + args[1] + " is not found.");
        }
      } else {
        if (currentApp == null) {
          throw new CliException("No application selected");
        }
        // refresh the state in currentApp
        currentApp = yarnClient.getApplicationReport(currentApp.getApplicationId());
        appReport = currentApp;
      }
      JSONObject response;
      try {
        response = getResource(StramWebServices.PATH_INFO, currentApp);
      } catch (Exception ex) {
        response = new JSONObject();
        response.put("startTime", appReport.getStartTime());
        response.put("id", appReport.getApplicationId().toString());
        response.put("name", appReport.getName());
        response.put("user", appReport.getUser());
      }
      response.put("state", appReport.getYarnApplicationState().name());
      response.put("trackingUrl", appReport.getTrackingUrl());
      response.put("finalStatus", appReport.getFinalApplicationStatus());
      JSONArray tags = new JSONArray();
      for (String tag : appReport.getApplicationTags()) {
        tags.put(tag);
      }
      response.put("tags", tags);
      printJson(response);
    }

  }

  private class GetContainerStackTrace implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      String containerLongId = getContainerLongId(args[1]);
      if (containerLongId == null) {
        throw new CliException("Container " + args[1] + " not found");
      }

      JSONObject response;
      try {
        response = getResource(StramWebServices.PATH_PHYSICAL_PLAN_CONTAINERS + "/" + args[1] + "/" + StramWebServices.PATH_STACKTRACE, currentApp);
      } catch (Exception ex) {
        throw new CliException("Webservice call to AppMaster failed.", ex);
      }

      printJson(response);
    }

  }

  private class GetAppPackageInfoCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      String[] tmpArgs = new String[args.length - 2];
      System.arraycopy(args, 2, tmpArgs, 0, args.length - 2);
      GetAppPackageInfoCommandLineInfo commandLineInfo = getGetAppPackageInfoCommandLineInfo(tmpArgs);
      try (AppPackage ap = newAppPackageInstance(new URI(args[1]), true)) {
        JSONSerializationProvider jomp = new JSONSerializationProvider();
        jomp.addSerializer(PropertyInfo.class,
            new AppPackage.PropertyInfoSerializer(commandLineInfo.provideDescription));
        JSONObject apInfo = new JSONObject(jomp.getContext(null).writeValueAsString(ap));
        apInfo.remove("name");
        printJson(apInfo);
      }
    }
  }

  private void checkConfigPackageCompatible(AppPackage ap, ConfigPackage cp)
  {
    if (cp == null) {
      return;
    }
    String requiredAppPackageName = cp.getAppPackageName();
    String requiredAppPackageGroupId = cp.getAppPackageGroupId();
    if (requiredAppPackageName != null && !requiredAppPackageName.equals(ap.getAppPackageName())) {
      throw new CliException("Config package requires an app package name of \"" + requiredAppPackageName + "\". The app package given has the name of \"" + ap.getAppPackageName() + "\"");
    }
    if (requiredAppPackageGroupId != null && !requiredAppPackageGroupId.equals(ap.getAppPackageGroupId())) {
      throw new CliException("Config package requires an app package group id of \"" + requiredAppPackageGroupId +
          "\". The app package given has the groupId of \"" + ap.getAppPackageGroupId() + "\"");
    }
    String requiredAppPackageMinVersion = cp.getAppPackageMinVersion();
    if (requiredAppPackageMinVersion != null && VersionInfo.compare(requiredAppPackageMinVersion, ap.getAppPackageVersion()) > 0) {
      throw new CliException("Config package requires an app package minimum version of \"" + requiredAppPackageMinVersion + "\". The app package given is of version \"" + ap.getAppPackageVersion() + "\"");
    }
    String requiredAppPackageMaxVersion = cp.getAppPackageMaxVersion();
    if (requiredAppPackageMaxVersion != null && VersionInfo.compare(requiredAppPackageMaxVersion, ap.getAppPackageVersion()) < 0) {
      throw new CliException("Config package requires an app package maximum version of \"" + requiredAppPackageMaxVersion + "\". The app package given is of version \"" + ap.getAppPackageVersion() + "\"");
    }
  }

  private void checkPlatformCompatible(AppPackage ap)
  {
    String apVersion = ap.getDtEngineVersion();
    VersionInfo actualVersion = VersionInfo.APEX_VERSION;
    if (!VersionInfo.isCompatible(actualVersion.getVersion(), apVersion)) {
      throw new CliException("This App Package is compiled with Apache Apex Core API version " + apVersion + ", which is incompatible with this Apex Core version " + actualVersion.getVersion());
    }
  }

  private void launchAppPackage(AppPackage ap, ConfigPackage cp, LaunchCommandLineInfo commandLineInfo, ConsoleReader reader) throws Exception
  {
    new LaunchCommand().execute(getLaunchAppPackageArgs(ap, cp, commandLineInfo, reader), reader);
  }

  String[] getLaunchAppPackageArgs(AppPackage ap, ConfigPackage cp, LaunchCommandLineInfo commandLineInfo, ConsoleReader reader) throws Exception
  {
    String matchAppName = null;
    if (commandLineInfo.args.length > 1) {
      matchAppName = commandLineInfo.args[1];
    }

    List<AppInfo> applications = new ArrayList<>(getAppsFromPackageAndConfig(ap, cp, commandLineInfo.useConfigApps));

    if (matchAppName != null) {
      Iterator<AppInfo> it = applications.iterator();
      while (it.hasNext()) {
        AppInfo ai = it.next();
        if ((commandLineInfo.exactMatch && !ai.name.equals(matchAppName))
            || !ai.name.toLowerCase().matches(".*" + matchAppName.toLowerCase() + ".*")) {
          it.remove();
        }
      }
    }

    AppInfo selectedApp = null;

    if (applications.isEmpty()) {
      throw new CliException("No applications in Application Package" + (matchAppName != null ? " matching \"" + matchAppName + "\"" : ""));
    } else if (applications.size() == 1) {
      selectedApp = applications.get(0);
    } else {
      //Store the appNames sorted in alphabetical order and their position in matchingAppFactories list
      TreeMap<String, Integer> appNamesInAlphabeticalOrder = new TreeMap<>();
      // Display matching applications
      for (int i = 0; i < applications.size(); i++) {
        String appName = applications.get(i).name;
        appNamesInAlphabeticalOrder.put(appName, i);
      }

      //Create a mapping between the app display number and original index at matchingAppFactories
      int index = 1;
      HashMap<Integer, Integer> displayIndexToOriginalUnsortedIndexMap = new HashMap<>();
      for (Map.Entry<String, Integer> entry : appNamesInAlphabeticalOrder.entrySet()) {
        //Map display number of the app to original unsorted index
        displayIndexToOriginalUnsortedIndexMap.put(index, entry.getValue());

        //Display the app names
        System.out.printf("%3d. %s\n", index++, entry.getKey());
      }

      // Exit if not in interactive mode
      if (!consolePresent) {
        throw new CliException("More than one application in Application Package match '" + matchAppName + "'");
      } else {
        boolean useHistory = reader.isHistoryEnabled();
        reader.setHistoryEnabled(false);
        History previousHistory = reader.getHistory();
        History dummyHistory = new MemoryHistory();
        reader.setHistory(dummyHistory);
        List<Completer> completers = new ArrayList<>(reader.getCompleters());
        for (Completer c : completers) {
          reader.removeCompleter(c);
        }
        reader.setHandleUserInterrupt(true);
        String optionLine;
        try {
          optionLine = reader.readLine("Choose application: ");
        } finally {
          reader.setHandleUserInterrupt(false);
          reader.setHistoryEnabled(useHistory);
          reader.setHistory(previousHistory);
          for (Completer c : completers) {
            reader.addCompleter(c);
          }
        }
        try {
          int option = Integer.parseInt(optionLine);
          if (0 < option && option <= applications.size()) {
            int appIndex = displayIndexToOriginalUnsortedIndexMap.get(option);
            selectedApp = applications.get(appIndex);
          }
        } catch (Exception ex) {
          // ignore
        }
      }
    }

    if (selectedApp == null) {
      throw new CliException("No application selected");
    }

    DTConfiguration launchProperties = getLaunchAppPackageProperties(ap, cp, commandLineInfo, selectedApp.name);
    String appFile = ap.tempDirectory() + "/app/" + selectedApp.file;

    List<String> launchArgs = new ArrayList<>();

    launchArgs.add("launch");
    launchArgs.add("-exactMatch");
    List<String> absClassPath = new ArrayList<>(ap.getClassPath());
    for (int i = 0; i < absClassPath.size(); i++) {
      String path = absClassPath.get(i);
      if (!path.startsWith("/")) {
        absClassPath.set(i, ap.tempDirectory() + "/" + path);
      }
    }

    if (cp != null) {
      StringBuilder files = new StringBuilder();
      for (String file : cp.getClassPath()) {
        if (files.length() != 0) {
          files.append(',');
        }
        files.append(cp.tempDirectory()).append(File.separatorChar).append(file);
      }
      if (!StringUtils.isBlank(files.toString())) {
        if (commandLineInfo.libjars != null) {
          commandLineInfo.libjars = files.toString() + "," + commandLineInfo.libjars;
        } else {
          commandLineInfo.libjars = files.toString();
        }
      }

      files.setLength(0);
      for (String file : cp.getFiles()) {
        if (files.length() != 0) {
          files.append(',');
        }
        files.append(cp.tempDirectory()).append(File.separatorChar).append(file);
      }
      if (!StringUtils.isBlank(files.toString())) {
        if (commandLineInfo.files != null) {
          commandLineInfo.files = files.toString() + "," + commandLineInfo.files;
        } else {
          commandLineInfo.files = files.toString();
        }
      }
    }

    StringBuilder libjarsVal = new StringBuilder();
    if (!absClassPath.isEmpty() || commandLineInfo.libjars != null) {
      if (!absClassPath.isEmpty()) {
        libjarsVal.append(org.apache.commons.lang3.StringUtils.join(absClassPath, ','));
      }
      if (commandLineInfo.libjars != null) {
        if (libjarsVal.length() > 0) {
          libjarsVal.append(",");
        }
        libjarsVal.append(commandLineInfo.libjars);
      }
    }
    if (appFile.endsWith(".json") || appFile.endsWith(".properties")) {
      if (libjarsVal.length() > 0) {
        libjarsVal.append(",");
      }
      libjarsVal.append(ap.tempDirectory()).append("/app/*.jar");
    }
    if (libjarsVal.length() > 0) {
      launchArgs.add("-libjars");
      launchArgs.add(libjarsVal.toString());
    }

    File launchPropertiesFile = new File(ap.tempDirectory(), "launch.xml");
    launchProperties.writeToFile(launchPropertiesFile, "");
    launchArgs.add("-conf");
    launchArgs.add(launchPropertiesFile.getCanonicalPath());
    if (commandLineInfo.localMode) {
      launchArgs.add("-local");
    }
    if (commandLineInfo.archives != null) {
      launchArgs.add("-archives");
      launchArgs.add(commandLineInfo.archives);
    }
    if (commandLineInfo.files != null) {
      launchArgs.add("-files");
      launchArgs.add(commandLineInfo.files);
    }
    if (commandLineInfo.origAppId != null) {
      launchArgs.add("-originalAppId");
      launchArgs.add(commandLineInfo.origAppId);
    }
    if (commandLineInfo.queue != null) {
      launchArgs.add("-queue");
      launchArgs.add(commandLineInfo.queue);
    }
    if (commandLineInfo.tags != null) {
      launchArgs.add("-tags");
      launchArgs.add(commandLineInfo.tags);
    }
    launchArgs.add(appFile);
    if (!appFile.endsWith(".json") && !appFile.endsWith(".properties")) {
      launchArgs.add(selectedApp.name);
    }

    LOG.debug("Launch command: {}", StringUtils.join(launchArgs, " "));
    return launchArgs.toArray(new String[]{});
  }


  DTConfiguration getLaunchAppPackageProperties(AppPackage ap, ConfigPackage cp, LaunchCommandLineInfo commandLineInfo, String appName) throws Exception
  {
    DTConfiguration launchProperties = new DTConfiguration();

    List<AppInfo> applications = getAppsFromPackageAndConfig(ap, cp, commandLineInfo.useConfigApps);

    AppInfo selectedApp = null;
    for (AppInfo app : applications) {
      if (app.name.equals(appName)) {
        selectedApp = app;
        break;
      }
    }
    Map<String, PropertyInfo> defaultProperties = selectedApp == null ? ap.getDefaultProperties() : selectedApp.defaultProperties;
    Set<String> requiredProperties = new TreeSet<>(selectedApp == null ? ap.getRequiredProperties() : selectedApp.requiredProperties);

    for (Map.Entry<String, PropertyInfo> entry : defaultProperties.entrySet()) {
      launchProperties.set(entry.getKey(), entry.getValue().getValue(), Scope.TRANSIENT, entry.getValue().getDescription());
      requiredProperties.remove(entry.getKey());
    }

      // settings specified in the user's environment take precedence over defaults in package.
    // since both are merged into a single -conf option below, apply them on top of the defaults here.
    File confFile = new File(StramClientUtils.getUserDTDirectory(), StramClientUtils.DT_SITE_XML_FILE);
    if (confFile.exists()) {
      Configuration userConf = new Configuration(false);
      userConf.addResource(new Path(confFile.toURI()));
      Iterator<Entry<String, String>> it = userConf.iterator();
      while (it.hasNext()) {
        Entry<String, String> entry = it.next();
        // filter relevant entries
        String key = entry.getKey();
        if (key.startsWith(StreamingApplication.DT_PREFIX)
            || key.startsWith(StreamingApplication.APEX_PREFIX)) {
          launchProperties.set(key, entry.getValue(), Scope.TRANSIENT, null);
          requiredProperties.remove(key);
        }
      }
    }

    if (commandLineInfo.apConfigFile != null) {
      DTConfiguration givenConfig = new DTConfiguration();
      givenConfig.loadFile(new File(ap.tempDirectory() + "/conf/" + commandLineInfo.apConfigFile));
      for (Map.Entry<String, String> entry : givenConfig) {
        launchProperties.set(entry.getKey(), entry.getValue(), Scope.TRANSIENT, null);
        requiredProperties.remove(entry.getKey());
      }
    }
    if (cp != null) {
      Map<String, String> properties = cp.getProperties(appName);
      for (Map.Entry<String, String> entry : properties.entrySet()) {
        launchProperties.set(entry.getKey(), entry.getValue(), Scope.TRANSIENT, null);
        requiredProperties.remove(entry.getKey());
      }
    } else if (commandLineInfo.configFile != null) {
      DTConfiguration givenConfig = new DTConfiguration();
      givenConfig.loadFile(new File(commandLineInfo.configFile));
      for (Map.Entry<String, String> entry : givenConfig) {
        launchProperties.set(entry.getKey(), entry.getValue(), Scope.TRANSIENT, null);
        requiredProperties.remove(entry.getKey());
      }
    }
    if (commandLineInfo.overrideProperties != null) {
      for (Map.Entry<String, String> entry : commandLineInfo.overrideProperties.entrySet()) {
        launchProperties.set(entry.getKey(), entry.getValue(), Scope.TRANSIENT, null);
        requiredProperties.remove(entry.getKey());
      }
    }

    // now look at whether it is in default configuration
    for (Map.Entry<String, String> entry : conf) {
      if (StringUtils.isNotBlank(entry.getValue())) {
        requiredProperties.remove(entry.getKey());
      }
    }
    if (!requiredProperties.isEmpty()) {
      throw new CliException("Required properties not set: " + StringUtils.join(requiredProperties, ", "));
    }

    //StramClientUtils.evalProperties(launchProperties);
    return launchProperties;
  }

  private List<AppInfo> getAppsFromPackageAndConfig(AppPackage ap, ConfigPackage cp, String configApps)
  {
    if (cp == null || configApps == null || !(configApps.equals(CONFIG_INCLUSIVE) || configApps.equals(CONFIG_EXCLUSIVE))) {
      return ap.getApplications();
    }

    File src = new File(cp.tempDirectory(), "app");
    File dest = new File(ap.tempDirectory(), "app");

    if (!src.exists()) {
      return ap.getApplications();
    }

    if (configApps.equals(CONFIG_EXCLUSIVE)) {

      for (File file : dest.listFiles()) {

        if (file.getName().endsWith(".json")) {
          FileUtils.deleteQuietly(new File(dest, file.getName()));
        }
      }
    } else {
      for (File file : src.listFiles()) {
        FileUtils.deleteQuietly(new File(dest, file.getName()));
      }
    }

    for (File file : src.listFiles()) {
      try {
        FileUtils.moveFileToDirectory(file, dest, true);
      } catch (IOException e) {
        LOG.warn("Application from the config file {} failed while processing.", file.getName());
      }
    }

    try {
      FileUtils.deleteDirectory(src);
    } catch (IOException e) {
      LOG.warn("Failed to delete the Config Apps folder");
    }

    ap.processAppDirectory(configApps.equals(CONFIG_EXCLUSIVE));

    return ap.getApplications();
  }

  private class GetAppPackageOperatorsCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      String[] tmpArgs = new String[args.length - 1];
      System.arraycopy(args, 1, tmpArgs, 0, args.length - 1);
      GetOperatorClassesCommandLineInfo commandLineInfo = getGetOperatorClassesCommandLineInfo(tmpArgs);
      try (AppPackage ap = newAppPackageInstance(new URI(commandLineInfo.args[0]), true)) {
        List<String> newArgs = new ArrayList<>();
        List<String> jars = new ArrayList<>();
        for (String jar : ap.getAppJars()) {
          jars.add(ap.tempDirectory() + "/app/" + jar);
        }
        for (String libJar : ap.getClassPath()) {
          jars.add(ap.tempDirectory() + "/" + libJar);
        }
        newArgs.add("get-jar-operator-classes");
        if (commandLineInfo.parent != null) {
          newArgs.add("-parent");
          newArgs.add(commandLineInfo.parent);
        }
        newArgs.add(StringUtils.join(jars, ","));
        for (int i = 1; i < commandLineInfo.args.length; i++) {
          newArgs.add(commandLineInfo.args[i]);
        }
        LOG.debug("Executing: " + newArgs);
        new GetJarOperatorClassesCommand().execute(newArgs.toArray(new String[]{}), reader);
      }
    }

  }

  private class GetAppPackageOperatorPropertiesCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      try (AppPackage ap = newAppPackageInstance(new URI(args[1]), true)) {
        List<String> newArgs = new ArrayList<>();
        List<String> jars = new ArrayList<>();
        for (String jar : ap.getAppJars()) {
          jars.add(ap.tempDirectory() + "/app/" + jar);
        }
        for (String libJar : ap.getClassPath()) {
          jars.add(ap.tempDirectory() + "/" + libJar);
        }
        newArgs.add("get-jar-operator-properties");
        newArgs.add(StringUtils.join(jars, ","));
        newArgs.add(args[2]);
        new GetJarOperatorPropertiesCommand().execute(newArgs.toArray(new String[]{}), reader);
      }
    }

  }

  private enum AttributesType
  {
    APPLICATION, OPERATOR, PORT
  }

  private class ListDefaultAttributesCommand implements Command
  {
    private final AttributesType type;

    protected ListDefaultAttributesCommand(@NotNull AttributesType type)
    {
      this.type = Preconditions.checkNotNull(type);
    }

    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      JSONObject result;
      if (type == AttributesType.APPLICATION) {
        result = TypeDiscoverer.getAppAttributes();
      } else if (type == AttributesType.OPERATOR) {
        result = TypeDiscoverer.getOperatorAttributes();
      } else {
        //get port attributes
        result = TypeDiscoverer.getPortAttributes();
      }
      printJson(result);
    }
  }

  private class CleanAppDirectoriesCommand implements Command
  {
    @Override
    public void execute(String[] args, ConsoleReader reader) throws Exception
    {
      JSONObject result = new JSONObject();
      JSONArray appArray = new JSONArray();
      List<ApplicationReport> apps = StramClientUtils.cleanAppDirectories(yarnClient, conf, fs,
          System.currentTimeMillis() - Long.valueOf(args[1]));
      for (ApplicationReport app : apps) {
        appArray.put(app.getApplicationId().toString());
      }
      result.put("applications", appArray);
      printJson(result);
    }
  }

  @SuppressWarnings("static-access")
  public static class GetPhysicalPropertiesCommandLineOptions
  {
    final Options options = new Options();
    final Option propertyName = add(OptionBuilder.withArgName("property name").hasArg().withDescription("The name of the property whose value needs to be retrieved").create("propertyName"));
    final Option waitTime = add(OptionBuilder.withArgName("wait time").hasArg().withDescription("How long to wait to get the result").create("waitTime"));

    private Option add(Option opt)
    {
      this.options.addOption(opt);
      return opt;
    }
  }

  private static GetPhysicalPropertiesCommandLineOptions GET_PHYSICAL_PROPERTY_OPTIONS = new GetPhysicalPropertiesCommandLineOptions();

  @SuppressWarnings("static-access")
  public static class LaunchCommandLineOptions
  {
    final Options options = new Options();
    final Option local = add(new Option("local", "Run application in local mode."));
    final Option configFile = add(OptionBuilder.withArgName("configuration file").hasArg().withDescription("Specify an application configuration file.").create("conf"));
    final Option apConfigFile = add(OptionBuilder.withArgName("app package configuration file").hasArg().withDescription("Specify an application configuration file within the app package if launching an app package.").create("apconf"));
    final Option defProperty = add(OptionBuilder.withArgName("property=value").hasArg().withDescription("Use value for given property.").create("D"));
    final Option libjars = add(OptionBuilder.withArgName("comma separated list of libjars").hasArg().withDescription("Specify comma separated jar files or other resource files to include in the classpath.").create("libjars"));
    final Option files = add(OptionBuilder.withArgName("comma separated list of files").hasArg().withDescription("Specify comma separated files to be copied on the compute machines.").create("files"));
    final Option archives = add(OptionBuilder.withArgName("comma separated list of archives").hasArg().withDescription("Specify comma separated archives to be unarchived on the compute machines.").create("archives"));
    final Option ignorePom = add(new Option("ignorepom", "Do not run maven to find the dependency"));
    final Option originalAppID = add(OptionBuilder.withArgName("application id").hasArg().withDescription("Specify original application identifier for restart.").create("originalAppId"));
    final Option exactMatch = add(new Option("exactMatch", "Only consider applications with exact app name"));
    final Option queue = add(OptionBuilder.withArgName("queue name").hasArg().withDescription("Specify the queue to launch the application").create("queue"));
    final Option tags = add(OptionBuilder.withArgName("comma separated tags").hasArg().withDescription("Specify the tags for the application").create("tags"));
    final Option force = add(new Option("force", "Force launch the application. Do not check for compatibility"));
    final Option useConfigApps = add(OptionBuilder.withArgName("inclusive or exclusive").hasArg().withDescription("\"inclusive\" - merge the apps in config and app package. \"exclusive\" - only show config package apps.").create("useConfigApps"));

    private Option add(Option opt)
    {
      this.options.addOption(opt);
      return opt;
    }

  }

  private static LaunchCommandLineOptions LAUNCH_OPTIONS = new LaunchCommandLineOptions();

  static LaunchCommandLineInfo getLaunchCommandLineInfo(String[] args) throws ParseException
  {
    CommandLineParser parser = new PosixParser();
    LaunchCommandLineInfo result = new LaunchCommandLineInfo();
    CommandLine line = parser.parse(LAUNCH_OPTIONS.options, args);
    result.localMode = line.hasOption(LAUNCH_OPTIONS.local.getOpt());
    result.configFile = line.getOptionValue(LAUNCH_OPTIONS.configFile.getOpt());
    result.apConfigFile = line.getOptionValue(LAUNCH_OPTIONS.apConfigFile.getOpt());
    result.ignorePom = line.hasOption(LAUNCH_OPTIONS.ignorePom.getOpt());
    String[] defs = line.getOptionValues(LAUNCH_OPTIONS.defProperty.getOpt());
    if (defs != null) {
      result.overrideProperties = new HashMap<>();
      for (String def : defs) {
        int equal = def.indexOf('=');
        if (equal < 0) {
          result.overrideProperties.put(def, null);
        } else {
          result.overrideProperties.put(def.substring(0, equal), def.substring(equal + 1));
        }
      }
    }
    result.libjars = line.getOptionValue(LAUNCH_OPTIONS.libjars.getOpt());
    result.archives = line.getOptionValue(LAUNCH_OPTIONS.archives.getOpt());
    result.files = line.getOptionValue(LAUNCH_OPTIONS.files.getOpt());
    result.queue = line.getOptionValue(LAUNCH_OPTIONS.queue.getOpt());
    result.tags = line.getOptionValue(LAUNCH_OPTIONS.tags.getOpt());
    result.args = line.getArgs();
    result.origAppId = line.getOptionValue(LAUNCH_OPTIONS.originalAppID.getOpt());
    result.exactMatch = line.hasOption("exactMatch");
    result.force = line.hasOption("force");
    result.useConfigApps = line.getOptionValue(LAUNCH_OPTIONS.useConfigApps.getOpt());

    return result;
  }

  static class LaunchCommandLineInfo
  {
    boolean localMode;
    boolean ignorePom;
    String configFile;
    String apConfigFile;
    Map<String, String> overrideProperties;
    String libjars;
    String files;
    String queue;
    String tags;
    String archives;
    String origAppId;
    boolean exactMatch;
    boolean force;
    String[] args;
    String useConfigApps;
  }

  @SuppressWarnings("static-access")
  public static Options getShowLogicalPlanCommandLineOptions()
  {
    Options options = new Options();
    Option libjars = OptionBuilder.withArgName("comma separated list of jars").hasArg().withDescription("Specify comma separated jar/resource files to include in the classpath.").create("libjars");
    Option ignorePom = new Option("ignorepom", "Do not run maven to find the dependency");
    Option exactMatch = new Option("exactMatch", "Only consider exact match for app name");
    options.addOption(libjars);
    options.addOption(ignorePom);
    options.addOption(exactMatch);
    return options;
  }

  private static ShowLogicalPlanCommandLineInfo getShowLogicalPlanCommandLineInfo(String[] args) throws ParseException
  {
    CommandLineParser parser = new PosixParser();
    ShowLogicalPlanCommandLineInfo result = new ShowLogicalPlanCommandLineInfo();
    CommandLine line = parser.parse(getShowLogicalPlanCommandLineOptions(), args);
    result.libjars = line.getOptionValue("libjars");
    result.ignorePom = line.hasOption("ignorepom");
    result.args = line.getArgs();
    result.exactMatch = line.hasOption("exactMatch");
    return result;
  }

  private static class ShowLogicalPlanCommandLineInfo
  {
    String libjars;
    boolean ignorePom;
    String[] args;
    boolean exactMatch;
  }

  public void mainHelper() throws Exception
  {
    init();
    run();
    System.exit(lastCommandError ? 1 : 0);
  }

  public static void main(final String[] args) throws Exception
  {
    final ApexCli shell = new ApexCli();
    shell.preImpersonationInit(args);
    String hadoopUserName = System.getenv("HADOOP_USER_NAME");
    if (UserGroupInformation.isSecurityEnabled()
        && StringUtils.isNotBlank(hadoopUserName)
        && !hadoopUserName.equals(UserGroupInformation.getLoginUser().getUserName())) {
      LOG.info("You ({}) are running as user {}", UserGroupInformation.getLoginUser().getUserName(), hadoopUserName);
      UserGroupInformation ugi = UserGroupInformation.createProxyUser(hadoopUserName, UserGroupInformation.getLoginUser());
      ugi.doAs(new PrivilegedExceptionAction<Void>()
      {
        @Override
        public Void run() throws Exception
        {
          shell.mainHelper();
          return null;
        }
      });
    } else {
      shell.mainHelper();
    }
  }

}
