blob: aed79de3da95f2520d7bd7b4546f25f76d60f3b9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.client.cli;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.cli.SqlCommandParser.SqlCommandCall;
import org.apache.flink.table.client.config.entries.ViewEntry;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.ProgramTargetDescriptor;
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.jline.reader.EndOfFileException;
import org.jline.reader.LineReader;
import org.jline.reader.LineReaderBuilder;
import org.jline.reader.MaskingCallback;
import org.jline.reader.UserInterruptException;
import org.jline.terminal.Terminal;
import org.jline.terminal.TerminalBuilder;
import org.jline.utils.AttributedString;
import org.jline.utils.AttributedStringBuilder;
import org.jline.utils.AttributedStyle;
import org.jline.utils.InfoCmp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOError;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* SQL CLI client.
*/
public class CliClient {
private static final Logger LOG = LoggerFactory.getLogger(CliClient.class);
private final Executor executor;
private final SessionContext context;
private final Terminal terminal;
private final LineReader lineReader;
private final String prompt;
private boolean isRunning;
private static final int PLAIN_TERMINAL_WIDTH = 80;
private static final int PLAIN_TERMINAL_HEIGHT = 30;
private static final int SOURCE_MAX_SIZE = 50_000;
public CliClient(SessionContext context, Executor executor) {
this.context = context;
this.executor = executor;
try {
// initialize terminal
terminal = TerminalBuilder.builder()
.name(CliStrings.CLI_NAME)
.build();
// make space from previous output and test the writer
terminal.writer().println();
terminal.writer().flush();
} catch (IOException e) {
throw new SqlClientException("Error opening command line interface.", e);
}
// initialize line lineReader
lineReader = LineReaderBuilder.builder()
.terminal(terminal)
.appName(CliStrings.CLI_NAME)
.parser(new SqlMultiLineParser())
.completer(new SqlCompleter(context, executor))
.build();
// this option is disabled for now for correct backslash escaping
// a "SELECT '\'" query should return a string with a backslash
lineReader.option(LineReader.Option.DISABLE_EVENT_EXPANSION, true);
// set strict "typo" distance between words when doing code completion
lineReader.setVariable(LineReader.ERRORS, 1);
// perform code completion case insensitive
lineReader.option(LineReader.Option.CASE_INSENSITIVE, true);
// create prompt
prompt = new AttributedStringBuilder()
.style(AttributedStyle.DEFAULT.foreground(AttributedStyle.GREEN))
.append("Flink SQL")
.style(AttributedStyle.DEFAULT)
.append("> ")
.toAnsi();
}
public Terminal getTerminal() {
return terminal;
}
public SessionContext getContext() {
return context;
}
public void clearTerminal() {
if (isPlainTerminal()) {
for (int i = 0; i < 200; i++) { // large number of empty lines
terminal.writer().println();
}
} else {
terminal.puts(InfoCmp.Capability.clear_screen);
}
}
public boolean isPlainTerminal() {
// check if terminal width can be determined
// e.g. IntelliJ IDEA terminal supports only a plain terminal
return terminal.getWidth() == 0 && terminal.getHeight() == 0;
}
public int getWidth() {
if (isPlainTerminal()) {
return PLAIN_TERMINAL_WIDTH;
}
return terminal.getWidth();
}
public int getHeight() {
if (isPlainTerminal()) {
return PLAIN_TERMINAL_HEIGHT;
}
return terminal.getHeight();
}
public Executor getExecutor() {
return executor;
}
/**
* Opens the interactive CLI shell.
*/
public void open() {
isRunning = true;
// print welcome
terminal.writer().append(CliStrings.MESSAGE_WELCOME);
// begin reading loop
while (isRunning) {
// make some space to previous command
terminal.writer().append("\n");
terminal.flush();
final String line;
try {
line = lineReader.readLine(prompt, null, (MaskingCallback) null, null);
} catch (UserInterruptException e) {
// user cancelled line with Ctrl+C
continue;
} catch (EndOfFileException | IOError e) {
// user cancelled application with Ctrl+D or kill
break;
} catch (Throwable t) {
throw new SqlClientException("Could not read from command line.", t);
}
if (line == null) {
continue;
}
final Optional<SqlCommandCall> cmdCall = parseCommand(line);
cmdCall.ifPresent(this::callCommand);
}
}
/**
* Submits a SQL update statement and prints status information and/or errors on the terminal.
*
* @param statement SQL update statement
* @return flag to indicate if the submission was successful or not
*/
public boolean submitUpdate(String statement) {
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_WILL_EXECUTE).toAnsi());
terminal.writer().println(new AttributedString(statement).toString());
terminal.flush();
final Optional<SqlCommandCall> parsedStatement = parseCommand(statement);
// only support INSERT INTO
return parsedStatement.map(cmdCall -> {
switch (cmdCall.command) {
case INSERT_INTO:
return callInsertInto(cmdCall);
default:
printError(CliStrings.MESSAGE_UNSUPPORTED_SQL);
return false;
}
}).orElse(false);
}
// --------------------------------------------------------------------------------------------
private Optional<SqlCommandCall> parseCommand(String line) {
final Optional<SqlCommandCall> parsedLine = SqlCommandParser.parse(line);
if (!parsedLine.isPresent()) {
printError(CliStrings.MESSAGE_UNKNOWN_SQL);
}
return parsedLine;
}
private void callCommand(SqlCommandCall cmdCall) {
switch (cmdCall.command) {
case QUIT:
callQuit();
break;
case CLEAR:
callClear();
break;
case RESET:
callReset();
break;
case SET:
callSet(cmdCall);
break;
case HELP:
callHelp();
break;
case SHOW_CATALOGS:
callShowCatalogs();
break;
case SHOW_DATABASES:
callShowDatabases();
break;
case SHOW_TABLES:
callShowTables();
break;
case SHOW_FUNCTIONS:
callShowFunctions();
break;
case USE:
callUseDatabase(cmdCall);
break;
case DESCRIBE:
case DESC:
callDescribe(cmdCall);
break;
case EXPLAIN:
callExplain(cmdCall);
break;
case SELECT:
callSelect(cmdCall);
break;
case INSERT_INTO:
callInsertInto(cmdCall);
break;
case CREATE_TABLE:
callCreateTable(cmdCall);
break;
case CREATE_VIEW:
callCreateView(cmdCall);
break;
case CREATE_FUNCTION:
callCreateFunction(cmdCall);
break;
case SOURCE:
callSource(cmdCall);
break;
default:
throw new SqlClientException("Unsupported command: " + cmdCall.command);
}
}
private void callQuit() {
printInfo(CliStrings.MESSAGE_QUIT);
isRunning = false;
}
private void callClear() {
clearTerminal();
}
private void callReset() {
context.resetSessionProperties();
printInfo(CliStrings.MESSAGE_RESET);
}
private void callSet(SqlCommandCall cmdCall) {
// show all properties
if (cmdCall.operands.length == 0) {
final Map<String, String> properties;
try {
properties = executor.getSessionProperties(context);
} catch (SqlExecutionException e) {
printExecutionException(e);
return;
}
if (properties.isEmpty()) {
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi());
} else {
properties
.entrySet()
.stream()
.map((e) -> e.getKey() + "=" + e.getValue())
.sorted()
.forEach((p) -> terminal.writer().println(p));
}
}
// set a property
else {
context.setSessionProperty(cmdCall.operands[0], cmdCall.operands[1]);
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_SET).toAnsi());
}
terminal.flush();
}
private void callHelp() {
terminal.writer().println(CliStrings.MESSAGE_HELP);
terminal.flush();
}
private void callShowCatalogs() {
final List<String> catalogs;
try {
catalogs = executor.listCatalogs(context);
} catch (SqlExecutionException e) {
printExecutionException(e);
return;
}
if (catalogs.isEmpty()) {
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi());
} else {
catalogs.forEach((v) -> terminal.writer().println(v));
}
terminal.flush();
}
private void callShowDatabases() {
final List<String> dbs;
try {
dbs = executor.listDatabases(context);
} catch (SqlExecutionException e) {
printExecutionException(e);
return;
}
if (dbs.isEmpty()) {
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi());
} else {
dbs.forEach((v) -> terminal.writer().println(v));
}
terminal.flush();
}
private void callShowTables() {
final List<String> tables;
try {
tables = executor.listTables(context);
} catch (SqlExecutionException e) {
printExecutionException(e);
return;
}
if (tables.isEmpty()) {
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi());
} else {
tables.forEach((v) -> terminal.writer().println(v));
}
terminal.flush();
}
private void callShowFunctions() {
final List<String> functions;
try {
functions = executor.listUserDefinedFunctions(context);
} catch (SqlExecutionException e) {
printExecutionException(e);
return;
}
if (functions.isEmpty()) {
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_EMPTY).toAnsi());
} else {
functions.forEach((v) -> terminal.writer().println(v));
}
terminal.flush();
}
private void callUseDatabase(SqlCommandCall cmdCall) {
try {
executor.setDefaultDatabase(context, cmdCall.operands[0]);
} catch (SqlExecutionException e) {
printExecutionException(e);
return;
}
terminal.flush();
}
private void callDescribe(SqlCommandCall cmdCall) {
final TableSchema schema;
try {
schema = executor.getTableSchema(context, cmdCall.operands[0]);
} catch (SqlExecutionException e) {
printExecutionException(e);
return;
}
terminal.writer().println(schema.toString());
terminal.flush();
}
private void callExplain(SqlCommandCall cmdCall) {
final String explanation;
try {
explanation = executor.explainStatement(context, cmdCall.operands[0]);
} catch (SqlExecutionException e) {
printExecutionException(e);
return;
}
terminal.writer().println(explanation);
terminal.flush();
}
private void callSelect(SqlCommandCall cmdCall) {
final ResultDescriptor resultDesc;
try {
resultDesc = executor.executeQuery(context, cmdCall.operands[0]);
} catch (SqlExecutionException e) {
printExecutionException(e);
return;
}
final CliResultView view;
if (resultDesc.isMaterialized()) {
view = new CliTableResultView(this, resultDesc);
} else {
view = new CliChangelogResultView(this, resultDesc);
}
// enter view
try {
view.open();
// view left
printInfo(CliStrings.MESSAGE_RESULT_QUIT);
} catch (SqlExecutionException e) {
printExecutionException(e);
}
}
private boolean callInsertInto(SqlCommandCall cmdCall) {
printInfo(CliStrings.MESSAGE_SUBMITTING_STATEMENT);
try {
final ProgramTargetDescriptor programTarget = executor.executeUpdate(context, cmdCall.operands[0]);
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_STATEMENT_SUBMITTED).toAnsi());
terminal.writer().println(programTarget.toString());
terminal.flush();
} catch (SqlExecutionException e) {
printExecutionException(e);
return false;
}
return true;
}
private void callCreateTable(SqlCommandCall cmdCall) {
try {
executor.createTable(context, cmdCall.operands[0]);
printInfo(CliStrings.MESSAGE_TABLE_CREATE);
} catch (SqlExecutionException e) {
printExecutionException(e);
}
}
private void callCreateView(SqlCommandCall cmdCall) {
try {
executor.createView(context, cmdCall.operands[0]);
printInfo(CliStrings.MESSAGE_VIEW_CREATED);
} catch (SqlExecutionException e) {
printExecutionException(e);
}
}
private void callCreateFunction(SqlCommandCall cmdCall) {
try {
executor.createFunction(context, cmdCall.operands[0]);
printInfo(CliStrings.MESSAGE_FUNCTION_CREATE);
} catch (SqlExecutionException e) {
printExecutionException(e);
}
}
private void callDropView(SqlCommandCall cmdCall) {
final String name = cmdCall.operands[0];
final ViewEntry view = context.getViews().get(name);
if (view == null) {
printExecutionError(CliStrings.MESSAGE_VIEW_NOT_FOUND);
return;
}
try {
// perform and validate change
context.removeView(name);
executor.validateSession(context);
printInfo(CliStrings.MESSAGE_VIEW_REMOVED);
} catch (SqlExecutionException e) {
// rollback change
context.addView(view);
printExecutionException(CliStrings.MESSAGE_VIEW_NOT_REMOVED, e);
}
}
private void callSource(SqlCommandCall cmdCall) {
final String pathString = cmdCall.operands[0];
// load file
final String stmt;
try {
final Path path = Paths.get(pathString);
byte[] encoded = Files.readAllBytes(path);
stmt = new String(encoded, Charset.defaultCharset());
} catch (IOException e) {
printExecutionException(e);
return;
}
// limit the output a bit
if (stmt.length() > SOURCE_MAX_SIZE) {
printExecutionError(CliStrings.MESSAGE_MAX_SIZE_EXCEEDED);
return;
}
terminal.writer().println(CliStrings.messageInfo(CliStrings.MESSAGE_WILL_EXECUTE).toAnsi());
terminal.writer().println(new AttributedString(stmt).toString());
terminal.flush();
// try to run it
final Optional<SqlCommandCall> call = parseCommand(stmt);
call.ifPresent(this::callCommand);
}
// --------------------------------------------------------------------------------------------
private void printExecutionException(Throwable t) {
printExecutionException(null, t);
}
private void printExecutionException(String message, Throwable t) {
final String finalMessage;
if (message == null) {
finalMessage = CliStrings.MESSAGE_SQL_EXECUTION_ERROR;
} else {
finalMessage = CliStrings.MESSAGE_SQL_EXECUTION_ERROR + ' ' + message;
}
printException(finalMessage, t);
}
private void printExecutionError(String message) {
terminal.writer().println(CliStrings.messageError(CliStrings.MESSAGE_SQL_EXECUTION_ERROR, message).toAnsi());
terminal.flush();
}
private void printException(String message, Throwable t) {
LOG.warn(message, t);
terminal.writer().println(CliStrings.messageError(message, t).toAnsi());
terminal.flush();
}
private void printError(String message) {
terminal.writer().println(CliStrings.messageError(message).toAnsi());
terminal.flush();
}
private void printInfo(String message) {
terminal.writer().println(CliStrings.messageInfo(message).toAnsi());
terminal.flush();
}
}