blob: 6ebfe8dec9db9109fc0cc6dd8c494955373277a9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.sql.client.impl;
import java.io.File;
import java.io.PrintWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.samza.sql.client.cli.CliCommand;
import org.apache.samza.sql.client.cli.CliConstants;
import org.apache.samza.sql.client.cli.CliEnvironment;
import org.apache.samza.sql.client.cli.CliShell;
import org.apache.samza.sql.client.cli.CliView;
import org.apache.samza.sql.client.cli.QueryResultLogView;
import org.apache.samza.sql.client.exceptions.CommandHandlerException;
import org.apache.samza.sql.client.exceptions.ExecutorException;
import org.apache.samza.sql.client.interfaces.CommandHandler;
import org.apache.samza.sql.client.interfaces.ExecutionContext;
import org.apache.samza.sql.client.interfaces.ExecutionStatus;
import org.apache.samza.sql.client.interfaces.NonQueryResult;
import org.apache.samza.sql.client.interfaces.QueryResult;
import org.apache.samza.sql.client.interfaces.SqlExecutor;
import org.apache.samza.sql.client.interfaces.SqlFunction;
import org.apache.samza.sql.client.util.CliUtil;
import org.apache.samza.sql.schema.SamzaSqlFieldType;
import org.apache.samza.sql.schema.SqlFieldSchema;
import org.apache.samza.sql.schema.SqlSchema;
import org.jline.terminal.Terminal;
import org.jline.utils.AttributedStringBuilder;
import org.jline.utils.AttributedStyle;
import org.jline.utils.InfoCmp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Default {@link CommandHandler} which handles basic commands of type {@link CliCommandType}
*/
public class CliCommandHandler implements CommandHandler {
private static final Logger LOG = LoggerFactory.getLogger(CliCommandHandler.class);
private CliShell shell;
private PrintWriter writer;
private Terminal terminal;
private ExecutionContext exeContext;
private SqlExecutor executor;
private CliEnvironment env;
private Map<Integer, String> executions = new TreeMap<>();
/**
* sets up the member variables
* @param shell the {@link CliShell} which uses this CommandHandler
* @param env the Shell's {@link CliEnvironment}
* @param terminal the {@link Terminal} to print output and messages
* @param exeContext the {@link ExecutionContext}
*/
public void init(CliShell shell, CliEnvironment env, Terminal terminal, ExecutionContext exeContext) {
this.env = env;
executor = env.getExecutor();
this.terminal = terminal;
this.writer = terminal.writer();
this.exeContext = exeContext;
this.shell = shell;
}
/**
* Attempts to parse the given input string line into a {@link CliCommand} of this
* handler's {@link org.apache.samza.sql.client.interfaces.CommandType}
* @param line input line string
* @return {@link CliCommand} on success, null otherwise
*/
public CliCommand parseLine(String line) {
line = CliUtil.trimCommand(line);
if (CliUtil.isNullOrEmpty(line))
return null;
String upperCaseLine = line.toUpperCase();
for (CliCommandType cmdType : CliCommandType.values()) {
String cmdText = cmdType.getCommandName();
if (upperCaseLine.startsWith(cmdText)) {
if (upperCaseLine.length() == cmdText.length())
return new CliCommand(cmdType);
else if (upperCaseLine.charAt(cmdText.length()) <= CliConstants.SPACE) {
String parameter = line.substring(cmdText.length()).trim();
if (!parameter.isEmpty())
return new CliCommand(cmdType, parameter);
}
}
}
return new CliCommand(CliCommandType.INVALID_COMMAND);
}
/**
* Prints to terminal the help message of the commands this handler handles
*/
public void printHelpMessage() {
AttributedStringBuilder builder = new AttributedStringBuilder();
builder.append("The following commands are supported by ")
.append(CliConstants.APP_NAME)
.append("by handler ")
.append(this.getClass().getName())
.append(" at the moment.\n\n");
for (CliCommandType cmdType : CliCommandType.values()) {
if (cmdType == CliCommandType.INVALID_COMMAND)
continue;
String cmdText = cmdType.getCommandName();
String cmdDescription = cmdType.getDescription();
builder.style(AttributedStyle.DEFAULT.bold())
.append(cmdText)
.append("\t\t")
.style(AttributedStyle.DEFAULT)
.append(cmdDescription)
.append("\n");
}
writer.println(builder.toAnsi());
}
/**
* Handles the given command
* @param command input {@link CliCommand} to handle
* @return false if command is to quit, or fatal error happened that Shell should not continue running. True o.w.
* @throws CommandHandlerException if unrecoverable error happened while handling the input {@link CliCommand}
*/
public boolean handleCommand(CliCommand command) throws CommandHandlerException {
boolean keepRunning = true;
if (!command.getCommandType().argsAreOptional() && CliUtil.isNullOrEmpty(command.getParameters())) {
CliUtil.printCommandUsage(command, writer);
return true;
}
try {
switch ((CliCommandType) command.getCommandType()) {
case CLEAR:
commandClear();
break;
case DESCRIBE:
commandDescribe(command);
break;
case EXECUTE:
commandExecuteFile(command);
break;
case EXIT:
case QUIT:
keepRunning = false;
break;
case HELP:
commandHelp(command);
break;
case INSERT_INTO:
commandInsertInto(command);
break;
case LS:
commandLs(command);
break;
case RM:
commandRm(command);
break;
case SELECT:
commandSelect(command);
break;
case SET:
commandSet(command);
break;
case SHOW_FUNCTIONS:
commandShowFunctions(command);
break;
case SHOW_TABLES:
commandShowTables(command);
break;
case STOP:
commandStop(command);
break;
case VERSION:
commandVersion();
break;
case INVALID_COMMAND:
printHelpMessage();
break;
default:
writer.println("UNDER DEVELOPEMENT. Command:" + command.getCommandType());
writer.println("Parameters:" + (CliUtil.isNullOrEmpty(command.getParameters()) ? "NULL" : command.getParameters()));
writer.flush();
}
} catch (Exception e) {
throw new CommandHandlerException(e);
}
return keepRunning;
}
private void commandVersion() {
printVersion();
}
private void printVersion() {
String version = String.format("Shell version %s, Executor is %s, version %s",
this.getClass().getPackage().getImplementationVersion(),
executor.getClass().getName(),
executor.getVersion());
writer.println(version);
}
private void commandClear() {
clearScreen();
}
private void commandDescribe(CliCommand command) throws ExecutorException {
String parameters = command.getParameters();
SqlSchema schema = executor.getTableSchema(exeContext, parameters);
List<String> lines = formatSchema4Display(schema);
for (String line : lines) {
writer.println(line);
}
writer.flush();
}
private void commandSet(CliCommand command) throws Exception {
String param = command.getParameters();
if (CliUtil.isNullOrEmpty(param)) {
env.printAll(writer);
writer.flush();
return;
}
String[] params = null;
boolean syntaxValid = param.split(" ").length == 1;
if (syntaxValid) {
params = param.split("=");
if (params.length == 1) {
String value = env.getEnvironmentVariable(param);
if (!CliUtil.isNullOrEmpty(value)) {
env.printVariable(writer, param, value);
}
return;
} else {
syntaxValid = params.length == 2;
}
}
if (!syntaxValid) {
writer.println(command.getCommandType().getUsage());
writer.flush();
return;
}
String name = params[0].trim().toLowerCase();
String value = params[1].trim();
int ret = env.setEnvironmentVariable(name, value);
if (ret == 0) {
writer.print(name);
writer.print(" set to ");
writer.println(value);
if (name.equals(CliConstants.CONFIG_EXECUTOR)) {
executor.stop(exeContext);
executor = env.getExecutor();
executor.start(exeContext);
}
} else if (ret == -1) {
writer.print("Unknow variable: ");
writer.println(name);
} else if (ret == -2) {
writer.print("Invalid value: ");
writer.print(value);
String[] vals = env.getPossibleValues(name);
if (vals != null && vals.length != 0) {
writer.print(" Possible values:");
for (String s : vals) {
writer.print(CliConstants.SPACE);
writer.print(s);
}
writer.println();
}
}
writer.flush();
}
private void commandExecuteFile(CliCommand command) throws ExecutorException {
String fullCmdStr = command.getFullCommand();
String parameters = command.getParameters();
URI uri = null;
boolean valid = false;
File file = null;
try {
uri = new URI(parameters);
file = new File(uri.getPath());
valid = file.exists() && !file.isDirectory();
} catch (URISyntaxException e) {
}
if (!valid) {
writer.println("Invalid URI.");
writer.flush();
return;
}
NonQueryResult nonQueryResult = executor.executeNonQuery(exeContext, file);
executions.put(nonQueryResult.getExecutionId(), fullCmdStr);
List<String> submittedStmts = nonQueryResult.getSubmittedStmts();
List<String> nonsubmittedStmts = nonQueryResult.getNonSubmittedStmts();
writer.println("Sql file submitted. Execution ID: " + nonQueryResult.getExecutionId());
writer.println("Submitted statements:");
if (submittedStmts == null || submittedStmts.size() == 0) {
writer.println("\tNone.");
} else {
for (String statement : submittedStmts) {
writer.print("\t");
writer.println(statement);
}
}
if (nonsubmittedStmts != null && nonsubmittedStmts.size() != 0) {
writer.println("Statements NOT submitted:");
for (String statement : nonsubmittedStmts) {
writer.print("\t");
writer.println(statement);
}
}
writer.println("Note: All query statements in a sql file are NOT submitted.");
writer.flush();
}
private void commandInsertInto(CliCommand command) throws ExecutorException {
String fullCmdStr = command.getFullCommand();
NonQueryResult result = executor.executeNonQuery(exeContext,
Collections.singletonList(fullCmdStr));
writer.print("Execution submitted successfully. Id: ");
writer.println(String.valueOf(result.getExecutionId()));
executions.put(result.getExecutionId(), fullCmdStr);
writer.flush();
}
private void commandLs(CliCommand command) {
List<Integer> execIds = new ArrayList<>();
String parameters = command.getParameters();
if (CliUtil.isNullOrEmpty(parameters)) {
execIds.addAll(executions.keySet());
} else {
execIds.addAll(splitExecutionIds(parameters));
}
if (execIds.size() == 0) {
return;
}
execIds.sort(Integer::compareTo);
final int terminalWidth = terminal.getWidth();
final int idWidth = 3;
final int statusWidth = 20;
final int cmdWidth = terminalWidth - idWidth - statusWidth - 4;
AttributedStyle oddLineStyle = AttributedStyle.DEFAULT.BOLD.foreground(AttributedStyle.BLUE);
AttributedStyle evenLineStyle = AttributedStyle.DEFAULT.BOLD.foreground(AttributedStyle.CYAN);
for (int i = 0; i < execIds.size(); ++i) {
Integer id = execIds.get(i);
String cmd = executions.get(id);
if (cmd == null)
continue;
String status = "UNKNOWN";
try {
ExecutionStatus execStatus = executor.queryExecutionStatus(id);
status = execStatus.name();
} catch (ExecutorException e) {
LOG.error("Error in commandLs: ", e);
}
int cmdStartIdx = 0;
int cmdLength = cmd.length();
StringBuilder line;
while (cmdStartIdx < cmdLength) {
line = new StringBuilder(terminalWidth);
if (cmdStartIdx == 0) {
line.append(CliConstants.SPACE);
line.append(id);
CliUtil.appendTo(line, 1 + idWidth + 1, CliConstants.SPACE);
line.append(status);
}
CliUtil.appendTo(line, 1 + idWidth + 1 + statusWidth + 1, CliConstants.SPACE);
int numToWrite = Math.min(cmdWidth, cmdLength - cmdStartIdx);
if (numToWrite > 0) {
line.append(cmd, cmdStartIdx, cmdStartIdx + numToWrite);
cmdStartIdx += numToWrite;
}
if (i % 2 == 0) {
AttributedStringBuilder attrBuilder = new AttributedStringBuilder().style(evenLineStyle);
attrBuilder.append(line.toString());
writer.println(attrBuilder.toAnsi());
} else {
AttributedStringBuilder attrBuilder = new AttributedStringBuilder().style(oddLineStyle);
attrBuilder.append(line.toString());
writer.println(attrBuilder.toAnsi());
}
}
}
writer.flush();
}
private void commandRm(CliCommand command) {
String parameters = command.getParameters();
List<Integer> execIds = new ArrayList<>();
execIds.addAll(splitExecutionIds(parameters));
for (Integer id : execIds) {
try {
ExecutionStatus status = executor.queryExecutionStatus(id);
if (status == ExecutionStatus.Running) {
writer.println(String.format("Execution %d is still running. Stop it first.", id));
continue;
}
executor.removeExecution(exeContext, id);
executions.remove(id);
} catch (ExecutorException e) {
writer.println("Error: " + e);
LOG.error("Error in commandRm: ", e);
}
}
writer.flush();
}
private void commandSelect(CliCommand command) throws ExecutorException {
QueryResult queryResult = executor.executeQuery(exeContext, command.getFullCommand());
CliView view = new QueryResultLogView();
view.open(shell, queryResult);
executor.stopExecution(exeContext, queryResult.getExecutionId());
}
private void commandShowTables(CliCommand command) throws ExecutorException {
List<String> tableNames = executor.listTables(exeContext);
for (String tableName : tableNames) {
writer.println(tableName);
}
writer.flush();
}
private void commandShowFunctions(CliCommand command) throws ExecutorException {
List<SqlFunction> fns = executor.listFunctions(exeContext);
for (SqlFunction fn : fns) {
writer.println(fn.toString());
}
writer.flush();
}
private void commandStop(CliCommand command) {
String parameters = command.getParameters();
if (CliUtil.isNullOrEmpty(parameters)) {
CliUtil.printCommandUsage(command, writer);
return;
}
List<Integer> execIds = new ArrayList<>();
String[] params = parameters.split("\u0020");
for (String param : params) {
Integer id = null;
try {
id = Integer.valueOf(param);
} catch (NumberFormatException e) {
}
if (id == null || !executions.containsKey(id)) {
writer.print("Error: ");
writer.print(param);
writer.println(" is not a valid id.");
} else {
execIds.add(id);
}
}
for (Integer id : execIds) {
try {
executor.stopExecution(exeContext, id);
writer.println(String.format("Request to stop execution %d was sent.", id));
} catch (ExecutorException e) {
writer.println("Error: " + e);
LOG.error("Error in commandStop: ", e);
}
}
writer.flush();
}
private void commandHelp(CliCommand command) {
String parameters = command.getParameters();
if (CliUtil.isNullOrEmpty(parameters)) {
printHelpMessage();
return;
}
parameters = parameters.trim().toUpperCase();
for (CliCommandType cmdType : CliCommandType.values()) {
String cmdText = cmdType.getCommandName();
if (cmdText.equals(parameters)) {
writer.println(cmdType.getUsage());
writer.flush();
return;
}
}
writer.print("Unknown command: ");
writer.println(parameters);
writer.flush();
}
private void clearScreen() {
terminal.puts(InfoCmp.Capability.clear_screen);
}
/*
Field | Type
-------------------------
Field1 | Type 1
Field2 | VARCHAR(STRING)
Field... | VARCHAR(STRING)
-------------------------
*/
private List<String> formatSchema4Display(SqlSchema schema) {
final String headerField = "Field";
final String headerType = "Type";
final char seperator = '|';
final char lineSep = '-';
int terminalWidth = terminal.getWidth();
// Two spaces * 2 plus one SEPERATOR
if (terminalWidth < 2 + 2 + 1 + headerField.length() + headerType.length()) {
return Collections.singletonList("Not enough room.");
}
// Find the best seperator position for least rows
int seperatorPos = headerField.length() + 2;
int minRowNeeded = Integer.MAX_VALUE;
int longestLineCharNum = 0;
int rowCount = schema.getFields().size();
for (int j = seperatorPos; j < terminalWidth - headerType.length() - 2; ++j) {
boolean fieldWrapped = false;
int rowNeeded = 0;
for (int i = 0; i < rowCount; ++i) {
SqlSchema.SqlField field = schema.getFields().get(i);
int fieldLen = field.getFieldName().length();
int typeLen = field.getFieldSchema().getFieldType().toString().length();
int fieldRowNeeded = CliUtil.ceilingDiv(fieldLen, j - 2);
int typeRowNeeded = CliUtil.ceilingDiv(typeLen, terminalWidth - 1 - j - 2);
rowNeeded += Math.max(fieldRowNeeded, typeRowNeeded);
fieldWrapped |= fieldRowNeeded > 1;
if (typeRowNeeded > 1) {
longestLineCharNum = terminalWidth;
} else {
longestLineCharNum = Math.max(longestLineCharNum, j + typeLen + 2 + 1);
}
}
if (rowNeeded < minRowNeeded) {
minRowNeeded = rowNeeded;
seperatorPos = j;
}
if (!fieldWrapped)
break;
}
List<String> lines = new ArrayList<>(minRowNeeded + 4);
// Header
StringBuilder line = new StringBuilder(terminalWidth);
line.append(CliConstants.SPACE);
line.append(headerField);
CliUtil.appendTo(line, seperatorPos - 1, CliConstants.SPACE);
line.append(seperator);
line.append(CliConstants.SPACE);
line.append(headerType);
lines.add(line.toString());
line = new StringBuilder(terminalWidth);
CliUtil.appendTo(line, longestLineCharNum - 1, lineSep);
lines.add(line.toString());
// Body
AttributedStyle oddLineStyle = AttributedStyle.BOLD.foreground(AttributedStyle.BLUE);
AttributedStyle evenLineStyle = AttributedStyle.BOLD.foreground(AttributedStyle.CYAN);
final int fieldColSize = seperatorPos - 2;
final int typeColSize = terminalWidth - seperatorPos - 1 - 2;
for (int i = 0; i < rowCount; ++i) {
SqlSchema.SqlField sqlField = schema.getFields().get(i);
String field = sqlField.getFieldName();
String type = getFieldDisplayValue(sqlField.getFieldSchema());
int fieldLen = field.length();
int typeLen = type.length();
int fieldStartIdx = 0, typeStartIdx = 0;
while (fieldStartIdx < fieldLen || typeStartIdx < typeLen) {
line = new StringBuilder(terminalWidth);
line.append(CliConstants.SPACE);
int numToWrite = Math.min(fieldColSize, fieldLen - fieldStartIdx);
if (numToWrite > 0) {
line.append(field, fieldStartIdx, fieldStartIdx + numToWrite);
fieldStartIdx += numToWrite;
}
CliUtil.appendTo(line, seperatorPos - 1, CliConstants.SPACE);
line.append(seperator);
line.append(CliConstants.SPACE);
numToWrite = Math.min(typeColSize, typeLen - typeStartIdx);
if (numToWrite > 0) {
line.append(type, typeStartIdx, typeStartIdx + numToWrite);
typeStartIdx += numToWrite;
}
if (i % 2 == 0) {
AttributedStringBuilder attrBuilder = new AttributedStringBuilder().style(evenLineStyle);
attrBuilder.append(line.toString());
lines.add(attrBuilder.toAnsi());
} else {
AttributedStringBuilder attrBuilder = new AttributedStringBuilder().style(oddLineStyle);
attrBuilder.append(line.toString());
lines.add(attrBuilder.toAnsi());
}
}
}
// Footer
line = new StringBuilder(terminalWidth);
CliUtil.appendTo(line, longestLineCharNum - 1, lineSep);
lines.add(line.toString());
return lines;
}
private String getFieldDisplayValue(SqlFieldSchema fieldSchema) {
if (!isComplexField(fieldSchema.getFieldType())) {
return fieldSchema.getFieldType().toString();
}
SamzaSqlFieldType fieldType = fieldSchema.getFieldType();
switch (fieldType) {
case ARRAY:
return String.format("ARRAY(%s)", getFieldDisplayValue(fieldSchema.getElementSchema()));
case MAP:
return String.format("MAP(%s, %s)", SamzaSqlFieldType.STRING.toString(),
getFieldDisplayValue(fieldSchema.getValueSchema()));
case ROW:
String rowDisplayValue = fieldSchema.getRowSchema()
.getFields()
.stream()
.map(f -> getFieldDisplayValue(f.getFieldSchema()))
.collect(Collectors.joining(","));
return String.format("ROW(%s)", rowDisplayValue);
default:
throw new UnsupportedOperationException("Unknown field type " + fieldType);
}
}
private boolean isComplexField(SamzaSqlFieldType fieldtype) {
return fieldtype == SamzaSqlFieldType.ARRAY || fieldtype == SamzaSqlFieldType.MAP
|| fieldtype == SamzaSqlFieldType.ROW;
}
private List<Integer> splitExecutionIds(String parameters) {
List<Integer> execIds = new ArrayList<>();
String[] params = parameters.split("\u0020");
for (String param : params) {
Integer id = null;
try {
id = Integer.valueOf(param);
} catch (NumberFormatException e) {
}
if (id == null || !executions.containsKey(id)) {
writer.print("Error: ");
writer.print(param);
writer.println(" is not a valid id.");
} else {
execIds.add(id);
}
}
return execIds;
}
}