blob: 05667bb2c65bd8dbdc413f13e49c289b20fca856 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.livy;
import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
* Livy SparkSQL Interpreter for Zeppelin.
*/
public class LivySparkSQLInterpreter extends BaseLivyInterpreter {
private LivySparkInterpreter sparkInterpreter;
private boolean isSpark2 = false;
private int maxResult = 1000;
public LivySparkSQLInterpreter(Properties property) {
super(property);
this.maxResult = Integer.parseInt(property.getProperty("zeppelin.livy.spark.sql.maxResult"));
}
@Override
public String getSessionKind() {
return "spark";
}
@Override
public void open() {
this.sparkInterpreter = getSparkInterpreter();
// As we don't know whether livyserver use spark2 or spark1, so we will detect SparkSession
// to judge whether it is using spark2.
try {
InterpreterResult result = sparkInterpreter.interpret("spark", null, false, false);
if (result.code() == InterpreterResult.Code.SUCCESS &&
result.message().get(0).getData().contains("org.apache.spark.sql.SparkSession")) {
LOGGER.info("SparkSession is detected so we are using spark 2.x for session {}",
sparkInterpreter.getSessionInfo().id);
isSpark2 = true;
} else {
// spark 1.x
result = sparkInterpreter.interpret("sqlContext", null, false, false);
if (result.code() == InterpreterResult.Code.SUCCESS) {
LOGGER.info("sqlContext is detected.");
} else if (result.code() == InterpreterResult.Code.ERROR) {
// create SqlContext if it is not available, as in livy 0.2 sqlContext
// is not available.
LOGGER.info("sqlContext is not detected, try to create SQLContext by ourselves");
result = sparkInterpreter.interpret(
"val sqlContext = new org.apache.spark.sql.SQLContext(sc)\n"
+ "import sqlContext.implicits._", null, false, false);
if (result.code() == InterpreterResult.Code.ERROR) {
throw new LivyException("Fail to create SQLContext," +
result.message().get(0).getData());
}
}
}
} catch (LivyException e) {
throw new RuntimeException("Fail to Detect SparkVersion", e);
}
}
private LivySparkInterpreter getSparkInterpreter() {
LazyOpenInterpreter lazy = null;
LivySparkInterpreter spark = null;
Interpreter p = getInterpreterInTheSameSessionByClassName(LivySparkInterpreter.class.getName());
while (p instanceof WrappedInterpreter) {
if (p instanceof LazyOpenInterpreter) {
lazy = (LazyOpenInterpreter) p;
}
p = ((WrappedInterpreter) p).getInnerInterpreter();
}
spark = (LivySparkInterpreter) p;
if (lazy != null) {
lazy.open();
}
return spark;
}
@Override
public InterpreterResult interpret(String line, InterpreterContext context) {
try {
if (StringUtils.isEmpty(line)) {
return new InterpreterResult(InterpreterResult.Code.SUCCESS, "");
}
// use triple quote so that we don't need to do string escape.
String sqlQuery = null;
if (isSpark2) {
sqlQuery = "spark.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")";
} else {
sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")";
}
InterpreterResult result = sparkInterpreter.interpret(sqlQuery, context.getParagraphId(),
this.displayAppInfo, true);
if (result.code() == InterpreterResult.Code.SUCCESS) {
InterpreterResult result2 = new InterpreterResult(InterpreterResult.Code.SUCCESS);
for (InterpreterResultMessage message : result.message()) {
// convert Text type to Table type. We assume the text type must be the sql output. This
// assumption is correct for now. Ideally livy should return table type. We may do it in
// the future release of livy.
if (message.getType() == InterpreterResult.Type.TEXT) {
List<String> rows = parseSQLOutput(message.getData());
result2.add(InterpreterResult.Type.TABLE, StringUtils.join(rows, "\n"));
if (rows.size() >= (maxResult + 1)) {
result2.add(InterpreterResult.Type.HTML,
"<font color=red>Results are limited by " + maxResult + ".</font>");
}
} else {
result2.add(message.getType(), message.getData());
}
}
return result2;
} else {
return result;
}
} catch (Exception e) {
LOGGER.error("Exception in LivySparkSQLInterpreter while interpret ", e);
return new InterpreterResult(InterpreterResult.Code.ERROR,
InterpreterUtils.getMostRelevantMessage(e));
}
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
protected List<String> parseSQLOutput(String output) {
List<String> rows = new ArrayList<>();
String[] lines = output.split("\n");
// at least 4 lines, even for empty sql output
// +---+---+
// | a| b|
// +---+---+
// +---+---+
// use the first line to determinte the position of feach cell
String[] tokens = StringUtils.split(lines[0], "\\+");
// pairs keeps the start/end position of each cell. We parse it from the first row
// which use '+' as separator
List<Pair> pairs = new ArrayList<>();
int start = 0;
int end = 0;
for (String token : tokens) {
start = end + 1;
end = start + token.length();
pairs.add(new Pair(start, end));
}
for (String line : lines) {
// Only match format "|....|"
// skip line like "+---+---+" and "only showing top 1 row"
if (line.matches("^\\|.*\\|$")) {
List<String> cells = new ArrayList<>();
for (Pair pair : pairs) {
// strip the blank space around the cell
cells.add(line.substring(pair.start, pair.end).trim());
}
rows.add(StringUtils.join(cells, "\t"));
}
}
return rows;
}
/**
* Represent the start and end index of each cell
*/
private static class Pair {
private int start;
private int end;
public Pair(int start, int end) {
this.start = start;
this.end = end;
}
}
public boolean concurrentSQL() {
return Boolean.parseBoolean(getProperty("zeppelin.livy.concurrentSQL"));
}
@Override
public Scheduler getScheduler() {
if (concurrentSQL()) {
int maxConcurrency = 10;
return SchedulerFactory.singleton().createOrGetParallelScheduler(
LivySparkInterpreter.class.getName() + this.hashCode(), maxConcurrency);
} else {
Interpreter intp =
getInterpreterInTheSameSessionByClassName(LivySparkInterpreter.class.getName());
if (intp != null) {
return intp.getScheduler();
} else {
return null;
}
}
}
@Override
public void close() {
this.sparkInterpreter.close();
}
@Override
public int getProgress(InterpreterContext context) {
if (this.sparkInterpreter != null) {
return this.sparkInterpreter.getProgress(context);
} else {
return 0;
}
}
@Override
protected String extractAppId() throws LivyException {
// it wont' be called because it would delegate to LivySparkInterpreter
throw new UnsupportedOperationException();
}
@Override
protected String extractWebUIAddress() throws LivyException {
// it wont' be called because it would delegate to LivySparkInterpreter
throw new UnsupportedOperationException();
}
}