blob: 9727a27b7c9a97788626856ab08d25aaa758e203 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.lens;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;
import java.util.regex.Matcher;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.io.ByteArrayOutputStream;
import org.apache.lens.client.LensClient;
import org.apache.lens.client.LensClientConfig;
import org.apache.lens.client.LensClientSingletonWrapper;
import org.apache.lens.cli.commands.BaseLensCommand;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResult.Code;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.shell.Bootstrap;
import org.springframework.shell.core.CommandResult;
import org.springframework.shell.core.JLineShell;
import org.springframework.shell.core.JLineShellComponent;
import org.springframework.shell.support.logging.HandlerUtils;
/**
* Lens interpreter for Zeppelin.
*/
public class LensInterpreter extends Interpreter {
static final Logger s_logger = LoggerFactory.getLogger(LensInterpreter.class);
static final String LENS_CLIENT_DBNAME = "lens.client.dbname";
static final String LENS_SERVER_URL = "lens.server.base.url";
static final String LENS_SESSION_CLUSTER_USER = "lens.session.cluster.user";
static final String LENS_PERSIST_RESULTSET = "lens.query.enable.persistent.resultset";
static final String ZEPPELIN_LENS_RUN_CONCURRENT_SESSION = "zeppelin.lens.run.concurrent";
static final String ZEPPELIN_LENS_CONCURRENT_SESSIONS = "zeppelin.lens.maxThreads";
static final String ZEPPELIN_MAX_ROWS = "zeppelin.lens.maxResults";
static final Map<String, Pattern> LENS_TABLE_FORMAT_REGEX = new LinkedHashMap<String, Pattern>() {
{
put("cubes", Pattern.compile(".*show\\s+cube.*"));
put("nativetables", Pattern.compile(".*show\\s+nativetable.*"));
put("storages", Pattern.compile(".*show\\s+storage.*"));
put("facts", Pattern.compile(".*show\\s+fact.*"));
put("dimensions", Pattern.compile(".*show\\s+dimension.*"));
put("params", Pattern.compile(".*show\\s+param.*"));
put("databases", Pattern.compile(".*show\\s+database.*"));
put("query results", Pattern.compile(".*query\\s+results.*"));
}
};
private static Pattern s_queryExecutePattern = Pattern.compile(".*query\\s+execute\\s+(.*)");
private static Map<String, ExecutionDetail> s_paraToQH =
new ConcurrentHashMap<> (); //tracks paragraphId -> Lens QueryHandle
private static Map<LensClient, Boolean> s_clientMap =
new ConcurrentHashMap<>();
private int m_maxResults;
private int m_maxThreads;
private JLineShell m_shell;
private LensClientConfig m_lensConf;
private Bootstrap m_bs;
private LensClient m_lensClient;
public LensInterpreter(Properties property) {
super(property);
try {
m_lensConf = new LensClientConfig();
m_lensConf.set(LENS_SERVER_URL, property.get(LENS_SERVER_URL).toString());
m_lensConf.set(LENS_CLIENT_DBNAME, property.get(LENS_CLIENT_DBNAME).toString());
m_lensConf.set(LENS_SESSION_CLUSTER_USER, property.get(LENS_SESSION_CLUSTER_USER).toString());
m_lensConf.set(LENS_PERSIST_RESULTSET, property.get(LENS_PERSIST_RESULTSET).toString());
try {
m_maxResults = Integer.parseInt(property.get(ZEPPELIN_MAX_ROWS).toString());
} catch (NumberFormatException|NullPointerException e) {
m_maxResults = 1000;
s_logger.error("unable to parse " + ZEPPELIN_MAX_ROWS + " :"
+ property.get(ZEPPELIN_MAX_ROWS), e);
}
try {
m_maxThreads = Integer.parseInt(property.get(ZEPPELIN_LENS_CONCURRENT_SESSIONS).toString());
} catch (NumberFormatException|NullPointerException e) {
m_maxThreads = 10;
s_logger.error("unable to parse " + ZEPPELIN_LENS_CONCURRENT_SESSIONS + " :"
+ property.get(ZEPPELIN_LENS_CONCURRENT_SESSIONS), e);
}
s_logger.info("LensInterpreter created");
}
catch (Exception e) {
s_logger.error(e.toString(), e);
s_logger.error("unable to create lens interpreter", e);
}
}
private Bootstrap createBootstrap() {
return new LensBootstrap();
}
private JLineShell getJLineShell(Bootstrap bs) {
if (bs instanceof LensBootstrap) {
return ((LensBootstrap) bs).getLensJLineShellComponent();
} else {
return bs.getJLineShellComponent();
}
}
protected void init() {
try {
m_bs = createBootstrap();
m_shell = getJLineShell(m_bs);
} catch (Exception ex) {
s_logger.error("could not initialize commandLine", ex);
}
}
@Override
public void open() {
s_logger.info("LensInterpreter opening");
m_lensClient = new LensClient(m_lensConf);
LensClientSingletonWrapper.instance().setClient(m_lensClient);
init();
s_logger.info("LensInterpreter opened");
}
@Override
public void close() {
closeConnections();
s_logger.info("LensInterpreter closed");
}
private static void closeConnections() {
for (LensClient cl : s_clientMap.keySet()) {
if (cl.isConnectionOpen()) {
closeLensClient(cl);
}
}
}
private static void closeLensClient(LensClient lensClient) {
try {
lensClient.closeConnection();
} catch (Exception e) {
s_logger.error("unable to close lensClient", e);
}
}
private LensClient createAndSetLensClient(Bootstrap bs) {
LensClient lensClient = null;
try {
lensClient = new LensClient(m_lensConf);
for (String beanName : bs.getApplicationContext().getBeanDefinitionNames()) {
if (bs.getApplicationContext().getBean(beanName) instanceof BaseLensCommand) {
((BaseLensCommand) bs.getApplicationContext().getBean(beanName))
.setClient(lensClient);
}
}
} catch (Exception e) {
s_logger.error("unable to create lens client", e);
throw e;
}
return lensClient;
}
private InterpreterResult HandleHelp(JLineShell shell, String st) {
java.util.logging.StreamHandler sh = null;
java.util.logging.Logger springLogger = null;
java.util.logging.Formatter formatter = new java.util.logging.Formatter() {
public String format(java.util.logging.LogRecord record) {
return record.getMessage();
}
};
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
sh = new java.util.logging.StreamHandler(baos, formatter);
springLogger = HandlerUtils.getLogger(org.springframework.shell.core.SimpleParser.class);
springLogger.addHandler(sh);
shell.executeCommand(st);
} catch (Exception e) {
s_logger.error(e.getMessage(), e);
return new InterpreterResult(Code.ERROR, e.getMessage());
}
finally {
sh.flush();
springLogger.removeHandler(sh);
sh.close();
}
return new InterpreterResult(Code.SUCCESS, baos.toString());
}
private String modifyQueryStatement(String st) {
Matcher matcher = s_queryExecutePattern.matcher(st.toLowerCase());
if (!matcher.find()) {
return st;
}
StringBuilder sb = new StringBuilder("query execute ");
if (!st.toLowerCase().matches(".*--async\\s+true")) {
sb.append("--async true ");
}
sb.append(matcher.group(1));
if (!st.toLowerCase().matches(".*limit\\s+\\d+.*")) {
sb.append(" limit ");
sb.append(m_maxResults);
}
return sb.toString();
}
@Override
public InterpreterResult interpret(String input, InterpreterContext context) {
if (input == null || input.length() == 0) {
return new InterpreterResult(Code.ERROR, "no command submitted");
}
String st = input.replaceAll("\\n", " ");
s_logger.info("LensInterpreter command: " + st);
Bootstrap bs = createBootstrap();
JLineShell shell = getJLineShell(bs);
CommandResult res = null;
LensClient lensClient = null;
String qh = null;
if (st.trim().startsWith("help")) {
return HandleHelp(shell, st);
}
try {
lensClient = createAndSetLensClient(bs);
s_clientMap.put(lensClient, true);
String lensCommand = modifyQueryStatement(st);
s_logger.info("executing command : " + lensCommand);
res = shell.executeCommand(lensCommand);
if (!lensCommand.equals(st) && res != null
&& res.getResult() != null
&& res.getResult().toString().trim().matches("[a-z0-9-]+")) {
// setup query progress tracking
qh = res.getResult().toString();
s_paraToQH.put(context.getParagraphId(),
new ExecutionDetail(qh, lensClient, shell));
String getResultsCmd = "query results --async false " + qh;
s_logger.info("executing query results command : " + context.getParagraphId()
+ " : " + getResultsCmd);
res = shell.executeCommand(getResultsCmd);
s_paraToQH.remove(context.getParagraphId());
}
} catch (Exception ex) {
s_logger.error("error in interpret", ex);
return new InterpreterResult(Code.ERROR, ex.getMessage());
}
finally {
if (shell != null) {
closeShell(shell);
}
if (lensClient != null) {
closeLensClient(lensClient);
s_clientMap.remove(lensClient);
}
if (qh != null) {
s_paraToQH.remove(context.getParagraphId());
}
}
return new InterpreterResult(Code.SUCCESS, formatResult(st, res));
}
private void closeShell(JLineShell shell) {
if (shell instanceof LensJLineShellComponent) {
((LensJLineShellComponent) shell).stop();
} else {
((JLineShellComponent) shell).stop();
}
}
private String formatResult(String st, CommandResult result) {
if (result == null) {
return "error in interpret, no result object returned";
}
if (!result.isSuccess() || result.getResult() == null) {
if (result.getException() != null) {
return result.getException().getMessage();
//try describe cube (without cube name)- error is written as a warning,
//but not returned to result object
} else {
return "error in interpret, unable to execute command";
}
}
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, Pattern> entry : LENS_TABLE_FORMAT_REGEX.entrySet()) {
if (entry.getValue().matcher(st.toLowerCase()).find()) {
sb.append("%table " + entry.getKey() + " \n");
break;
}
}
if (s_queryExecutePattern.matcher(st.toLowerCase()).find() &&
result.getResult().toString().contains(" rows process in (")) {
sb.append("%table ");
}
if (sb.length() > 0) {
return sb.append(result.getResult().toString()).toString();
}
return result.getResult().toString();
//Lens sends error messages without setting result.isSuccess() = false.
}
@Override
public void cancel(InterpreterContext context) {
if (!s_paraToQH.containsKey(context.getParagraphId())) {
s_logger.error("ignoring cancel from " + context.getParagraphId());
return;
}
String qh = s_paraToQH.get(context.getParagraphId()).getQueryHandle();
s_logger.info("preparing to cancel : (" + context.getParagraphId() + ") :" + qh);
Bootstrap bs = createBootstrap();
JLineShell shell = getJLineShell(bs);
LensClient lensClient = null;
try {
lensClient = createAndSetLensClient(bs);
s_clientMap.put(lensClient, true);
s_logger.info("invoke query kill (" + context.getParagraphId() + ") " + qh);
CommandResult res = shell.executeCommand("query kill " + qh);
s_logger.info("query kill returned (" + context.getParagraphId() + ") " + qh
+ " with: " + res.getResult());
} catch (Exception e) {
s_logger.error("unable to kill query ("
+ context.getParagraphId() + ") " + qh, e);
} finally {
try {
if (lensClient != null) {
closeLensClient(lensClient);
s_clientMap.remove(lensClient);
}
closeLensClient(s_paraToQH.get(context.getParagraphId()).getLensClient());
closeShell(s_paraToQH.get(context.getParagraphId()).getShell());
} catch (Exception e) {
// ignore
s_logger.info("Exception in LensInterpreter while cancel finally, ignore", e);
}
s_paraToQH.remove(context.getParagraphId());
closeShell(shell);
}
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext context) {
if (s_paraToQH.containsKey(context.getParagraphId())) {
s_logger.info("number of items for which progress can be reported :" + s_paraToQH.size());
s_logger.info("number of open lensclient :" + s_clientMap.size());
Bootstrap bs = createBootstrap();
JLineShell shell = getJLineShell(bs);
LensClient lensClient = null;
String qh = s_paraToQH.get(context.getParagraphId()).getQueryHandle();
try {
s_logger.info("fetch query status for : (" + context.getParagraphId() + ") :" + qh);
lensClient = createAndSetLensClient(bs);
s_clientMap.put(lensClient, true);
CommandResult res = shell.executeCommand("query status " + qh);
s_logger.info(context.getParagraphId() + " --> " + res.getResult().toString());
//change to debug
Pattern pattern = Pattern.compile(".*(Progress : (\\d\\.\\d)).*");
Matcher matcher = pattern.matcher(res.getResult().toString().replaceAll("\\n", " "));
if (matcher.find(2)) {
Double d = Double.parseDouble(matcher.group(2)) * 100;
if (d.intValue() == 100) {
s_paraToQH.remove(context.getParagraphId());
}
return d.intValue();
} else {
return 1;
}
}
catch (Exception e) {
s_logger.error("unable to get progress for (" + context.getParagraphId() + ") :" + qh, e);
s_paraToQH.remove(context.getParagraphId());
return 0;
} finally {
if (lensClient != null) {
closeLensClient(lensClient);
s_clientMap.remove(lensClient);
}
if (shell != null) {
closeShell(shell);
}
}
}
return 0;
}
@Override
public List<InterpreterCompletion> completion(String buf, int cursor,
InterpreterContext interpreterContext) {
return null;
}
public boolean concurrentRequests() {
return Boolean.parseBoolean(getProperty(ZEPPELIN_LENS_RUN_CONCURRENT_SESSION));
}
@Override
public Scheduler getScheduler() {
if (concurrentRequests()) {
return SchedulerFactory.singleton().createOrGetParallelScheduler(
LensInterpreter.class.getName() + this.hashCode(), m_maxThreads);
} else {
return super.getScheduler();
}
}
}