blob: 757099b4d7140598bd864ca0d4e525d887c02277 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.table.client;
import org.apache.flink.table.client.cli.CliClient;
import org.apache.flink.table.client.cli.CliOptions;
import org.apache.flink.table.client.cli.CliOptionsParser;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.SessionContext;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.client.gateway.local.LocalExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URL;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* SQL Client for submitting SQL statements. The client can be executed in two
* modes: a gateway and embedded mode.
*
* <p>- In embedded mode, the SQL CLI is tightly coupled with the executor in a common process. This
* allows for submitting jobs without having to start an additional component.
*
* <p>- In future versions: In gateway mode, the SQL CLI client connects to the REST API of the gateway
* and allows for managing queries via console.
*
* <p>For debugging in an IDE you can execute the main method of this class using:
* "embedded --defaults /path/to/sql-client-defaults.yaml --jar /path/to/target/flink-sql-client-*.jar"
*
* <p>Make sure that the FLINK_CONF_DIR environment variable is set.
*/
public class SqlClient {
private static final Logger LOG = LoggerFactory.getLogger(SqlClient.class);
private final boolean isEmbedded;
private final CliOptions options;
public static final String MODE_EMBEDDED = "embedded";
public static final String MODE_GATEWAY = "gateway";
public static final String DEFAULT_SESSION_ID = "default";
public SqlClient(boolean isEmbedded, CliOptions options) {
this.isEmbedded = isEmbedded;
this.options = options;
}
private void start() {
if (isEmbedded) {
// create local executor with default environment
final List<URL> jars;
if (options.getJars() != null) {
jars = options.getJars();
} else {
jars = Collections.emptyList();
}
final List<URL> libDirs;
if (options.getLibraryDirs() != null) {
libDirs = options.getLibraryDirs();
} else {
libDirs = Collections.emptyList();
}
final Executor executor = new LocalExecutor(options.getDefaults(), jars, libDirs);
executor.start();
// create CLI client with session environment
final Environment sessionEnv = readSessionEnvironment(options.getEnvironment());
final SessionContext context;
if (options.getSessionId() == null) {
context = new SessionContext(DEFAULT_SESSION_ID, sessionEnv);
} else {
context = new SessionContext(options.getSessionId(), sessionEnv);
}
// validate the environment (defaults and session)
validateEnvironment(context, executor);
// add shutdown hook
Runtime.getRuntime().addShutdownHook(new EmbeddedShutdownThread(context, executor));
// do the actual work
openCli(context, executor);
} else {
throw new SqlClientException("Gateway mode is not supported yet.");
}
}
/**
* Opens the CLI client for executing SQL statements.
*
* @param context session context
* @param executor executor
*/
private void openCli(SessionContext context, Executor executor) {
final CliClient cli = new CliClient(context, executor);
// interactive CLI mode
if (options.getUpdateStatement() == null) {
cli.open();
}
// execute single update statement
else {
final boolean success = cli.submitUpdate(options.getUpdateStatement());
if (!success) {
throw new SqlClientException("Could not submit given SQL update statement to cluster.");
}
}
}
// --------------------------------------------------------------------------------------------
private static void validateEnvironment(SessionContext context, Executor executor) {
System.out.print("Validating current environment...");
try {
executor.validateSession(context);
System.out.println("done.");
} catch (SqlExecutionException e) {
throw new SqlClientException(
"The configured environment is invalid. Please check your environment files again.", e);
}
}
private static void shutdown(SessionContext context, Executor executor) {
System.out.println();
System.out.print("Shutting down executor...");
executor.stop(context);
System.out.println("done.");
}
private static Environment readSessionEnvironment(URL envUrl) {
// use an empty environment by default
if (envUrl == null) {
System.out.println("No session environment specified.");
return new Environment();
}
System.out.println("Reading session environment from: " + envUrl);
LOG.info("Using session environment file: {}", envUrl);
try {
return Environment.parse(envUrl);
} catch (IOException e) {
throw new SqlClientException("Could not read session environment file at: " + envUrl, e);
}
}
// --------------------------------------------------------------------------------------------
public static void main(String[] args) {
if (args.length < 1) {
CliOptionsParser.printHelpClient();
return;
}
switch (args[0]) {
case MODE_EMBEDDED:
// remove mode
final String[] modeArgs = Arrays.copyOfRange(args, 1, args.length);
final CliOptions options = CliOptionsParser.parseEmbeddedModeClient(modeArgs);
if (options.isPrintHelp()) {
CliOptionsParser.printHelpEmbeddedModeClient();
} else {
try {
final SqlClient client = new SqlClient(true, options);
client.start();
} catch (SqlClientException e) {
// make space in terminal
System.out.println();
System.out.println();
LOG.error("SQL Client must stop.", e);
throw e;
} catch (Throwable t) {
// make space in terminal
System.out.println();
System.out.println();
LOG.error("SQL Client must stop. Unexpected exception. This is a bug. Please consider filing an issue.", t);
throw new SqlClientException("Unexpected exception. This is a bug. Please consider filing an issue.", t);
}
}
break;
case MODE_GATEWAY:
throw new SqlClientException("Gateway mode is not supported yet.");
default:
CliOptionsParser.printHelpClient();
}
}
// --------------------------------------------------------------------------------------------
private class EmbeddedShutdownThread extends Thread {
private final SessionContext context;
private final Executor executor;
public EmbeddedShutdownThread(SessionContext context, Executor executor) {
this.context = context;
this.executor = executor;
}
@Override
public void run() {
shutdown(context, executor);
}
}
}