blob: a954e1dfa9a369ac08d4d5831ef07060d2b53da4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.jupyter;
import io.grpc.ManagedChannelBuilder;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
import org.apache.commons.exec.PumpStreamHandler;
import org.apache.commons.exec.environment.EnvironmentUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.AbstractInterpreter;
import org.apache.zeppelin.interpreter.ZeppelinContext;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.jupyter.proto.CancelRequest;
import org.apache.zeppelin.interpreter.jupyter.proto.CompletionRequest;
import org.apache.zeppelin.interpreter.jupyter.proto.CompletionResponse;
import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteRequest;
import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteResponse;
import org.apache.zeppelin.interpreter.jupyter.proto.KernelStatus;
import org.apache.zeppelin.interpreter.jupyter.proto.StatusRequest;
import org.apache.zeppelin.interpreter.jupyter.proto.StatusResponse;
import org.apache.zeppelin.interpreter.jupyter.proto.StopRequest;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.interpreter.util.ProcessLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* Jupyter Kernel Interpreter for Zeppelin. One instance of this class represents one
* Jupyter Kernel. You can enhance the jupyter kernel by extending this class.
* e.g. IPythonInterpreter.
*/
public class JupyterKernelInterpreter extends AbstractInterpreter {
private static final Logger LOGGER = LoggerFactory.getLogger(JupyterKernelInterpreter.class);
private JupyterKernelProcessLauncher jupyterKernelProcessLauncher;
protected JupyterKernelClient jupyterKernelClient;
protected ZeppelinContext z;
private String kernel;
// working directory of jupyter kernel
protected File kernelWorkDir;
// python executable file for launching the jupyter kernel
protected String pythonExecutable;
protected String condaEnv;
private int kernelLaunchTimeout;
private InterpreterOutputStream interpreterOutput = new InterpreterOutputStream(LOGGER);
public JupyterKernelInterpreter(String kernel, Properties properties) {
this(properties);
this.kernel = kernel;
}
public JupyterKernelInterpreter(Properties properties) {
super(properties);
}
public String getKernelName() {
return this.kernel;
}
public List<PythonPackagePredicate<String>> getRequiredPackagesPredicates() {
/**
* Example line, if the Python package is installed with pip:
* grpcio==1.18.0
* Example line, if the Python package is installed with conda:
* grpcio @ file:///home/conda/feedstock_root/build_artifacts/grpcio_1604365513151/work
*/
List<PythonPackagePredicate<String>> requiredPackages = new ArrayList<>();
requiredPackages.add(
new PythonPackagePredicate<>("jupyter-client or jupyter_client",
packages -> packages.contains("jupyter-client ") ||
packages.contains("jupyter-client=") ||
packages.contains("jupyter_client ") ||
packages.contains("jupyter_client=")));
requiredPackages.add(
new PythonPackagePredicate<>("grpcio",
packages -> packages.contains("grpcio ") ||
packages.contains("grpcio=")));
requiredPackages.add(
new PythonPackagePredicate<>("protobuf",
packages -> packages.contains("protobuf ") ||
packages.contains("protobuf=")));
return requiredPackages;
}
protected ZeppelinContext buildZeppelinContext() {
return new JupyterZeppelinContext(null, 1000);
}
@Override
public void open() throws InterpreterException {
try {
if (jupyterKernelClient != null) {
// JupyterKernelInterpreter might already been opened
return;
}
String envName = getProperty("zeppelin.interpreter.conda.env.name");
if (StringUtils.isNotBlank(envName)) {
pythonExecutable = activateCondaEnv(envName);
} else {
pythonExecutable = getProperty("zeppelin.python", "python");
}
LOGGER.info("Python Exec: {}", pythonExecutable);
String checkPrerequisiteResult = checkKernelPrerequisite(pythonExecutable);
if (!StringUtils.isEmpty(checkPrerequisiteResult)) {
throw new InterpreterException("Kernel prerequisite is not meet: " +
checkPrerequisiteResult);
}
kernelLaunchTimeout = Integer.parseInt(
getProperty("zeppelin.jupyter.kernel.launch.timeout", "30000"));
this.z = buildZeppelinContext();
int kernelPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
int messageSize = Integer.parseInt(getProperty("zeppelin.jupyter.kernel.grpc.message_size",
32 * 1024 * 1024 + ""));
jupyterKernelClient = new JupyterKernelClient(ManagedChannelBuilder.forAddress("127.0.0.1",
kernelPort).usePlaintext().maxInboundMessageSize(messageSize),
getProperties(), kernel);
launchJupyterKernel(kernelPort);
} catch (Exception e) {
throw new InterpreterException("Fail to open JupyterKernelInterpreter:\n" +
ExceptionUtils.getStackTrace(e), e);
}
}
/**
* non-empty return value mean the errors when checking kernel prerequisite.
* empty value mean kernel prerequisite is met.
*
* @return check result of checking kernel prerequisite.
*/
public String checkKernelPrerequisite(String pythonExec) {
LOGGER.info("checkKernelPrerequisite using python executable: {}", pythonExec);
ProcessBuilder processBuilder = new ProcessBuilder(pythonExec, "-m", "pip", "freeze");
File stderrFile = null;
File stdoutFile = null;
try {
stderrFile = File.createTempFile("zeppelin", ".txt");
processBuilder.redirectError(stderrFile);
stdoutFile = File.createTempFile("zeppelin", ".txt");
processBuilder.redirectOutput(stdoutFile);
Process proc = processBuilder.start();
int ret = proc.waitFor();
if (ret != 0) {
try (FileInputStream in = new FileInputStream(stderrFile)) {
return "Fail to run pip freeze.\n" + IOUtils.toString(in, StandardCharsets.UTF_8);
}
}
try (FileInputStream in = new FileInputStream(stdoutFile)) {
String freezeOutput = IOUtils.toString(in, StandardCharsets.UTF_8);
for (PythonPackagePredicate<String> packagePredicate : getRequiredPackagesPredicates()) {
if (!packagePredicate.test(freezeOutput)) {
return packagePredicate + " is not installed, installed packages:\n" + freezeOutput;
}
}
LOGGER.info("Prerequisite for kernel {} is met", getKernelName());
}
} catch (Exception e) {
LOGGER.warn("Fail to checkKernelPrerequisite", e);
return "Fail to checkKernelPrerequisite: " + ExceptionUtils.getStackTrace(e);
} finally {
FileUtils.deleteQuietly(stderrFile);
FileUtils.deleteQuietly(stdoutFile);
}
return "";
}
private String activateCondaEnv(String envName) throws IOException {
LOGGER.info("Activating conda env: {}", envName);
ByteArrayOutputStream stdout = new ByteArrayOutputStream();
PumpStreamHandler psh = new PumpStreamHandler(stdout);
if (!new File(envName).exists()) {
throw new IOException("Fail to activating conda env because no environment folder: " +
envName);
}
File scriptFile = Files.createTempFile("zeppelin_jupyter_kernel_", ".sh").toFile();
try (FileWriter writer = new FileWriter(scriptFile)) {
IOUtils.write(String.format("chmod 777 -R %s \nsource %s/bin/activate \nconda-unpack",
envName, envName),
writer);
}
scriptFile.setExecutable(true, false);
scriptFile.setReadable(true, false);
CommandLine cmd = new CommandLine(scriptFile.getAbsolutePath());
DefaultExecutor executor = new DefaultExecutor();
executor.setStreamHandler(psh);
int exitCode = executor.execute(cmd);
if (exitCode != 0) {
throw new IOException("Fail to activate conda env, " + stdout.toString());
} else {
LOGGER.info("Activate conda env successfully");
this.condaEnv = envName;
return envName + "/bin/python";
}
}
private void launchJupyterKernel(int kernelPort)
throws IOException {
LOGGER.info("Launching Jupyter Kernel at port: {}", kernelPort);
// copy the python scripts to a temp directory, then launch jupyter kernel in that folder
this.kernelWorkDir = Files.createTempDirectory(
"zeppelin_jupyter_kernel_" + getKernelName()).toFile();
String[] kernelScripts = {"kernel_server.py", "kernel_pb2.py", "kernel_pb2_grpc.py"};
for (String kernelScript : kernelScripts) {
URL url = getClass().getClassLoader().getResource("grpc/jupyter"
+ "/" + kernelScript);
FileUtils.copyURLToFile(url, new File(kernelWorkDir, kernelScript));
}
CommandLine cmd = CommandLine.parse(pythonExecutable);
cmd.addArgument(kernelWorkDir.getAbsolutePath() + "/kernel_server.py");
cmd.addArgument(getKernelName());
cmd.addArgument(String.valueOf(kernelPort));
Map<String, String> envs = setupKernelEnv();
jupyterKernelProcessLauncher = new JupyterKernelProcessLauncher(cmd, envs);
jupyterKernelProcessLauncher.launch();
jupyterKernelProcessLauncher.waitForReady(kernelLaunchTimeout);
if (jupyterKernelProcessLauncher.isLaunchTimeout()) {
throw new IOException("Fail to launch Jupyter Kernel in " + kernelLaunchTimeout / 1000
+ " seconds.\n" + jupyterKernelProcessLauncher.getErrorMessage());
}
if (!jupyterKernelProcessLauncher.isRunning()) {
throw new IOException("Fail to launch Jupyter Kernel as the python process is failed.\n"
+ jupyterKernelProcessLauncher.getErrorMessage());
}
}
protected Map<String, String> setupKernelEnv() throws IOException {
return EnvironmentUtils.getProcEnvironment();
}
public JupyterKernelProcessLauncher getKernelProcessLauncher() {
return jupyterKernelProcessLauncher;
}
@Override
public void close() throws InterpreterException {
if (jupyterKernelProcessLauncher != null) {
LOGGER.info("Shutdown Jupyter Kernel Process");
if (jupyterKernelProcessLauncher.isRunning()) {
jupyterKernelClient.stop(StopRequest.newBuilder().build());
try {
jupyterKernelClient.shutdown();
} catch (InterruptedException e) {
LOGGER.warn("Exception happens when shutting down jupyter kernel client", e);
}
}
jupyterKernelProcessLauncher.stop();
jupyterKernelProcessLauncher = null;
LOGGER.info("Jupyter Kernel is killed");
}
}
@Override
public InterpreterResult internalInterpret(String st,
InterpreterContext context) throws InterpreterException {
z.setGui(context.getGui());
z.setNoteGui(context.getNoteGui());
z.setInterpreterContext(context);
interpreterOutput.setInterpreterOutput(context.out);
jupyterKernelClient.setInterpreterContext(context);
try {
ExecuteResponse response =
jupyterKernelClient.stream_execute(ExecuteRequest.newBuilder().setCode(st).build(),
interpreterOutput);
interpreterOutput.getInterpreterOutput().flush();
// It is not known which method is called first (JupyterKernelClient.stream_execute
// or onProcessFailed) when jupyter kernel process is exited. Because they are in
// 2 different threads. So here we would check JupyterKernelClient's status and sleep 1 second
// if jupyter kernel is maybe terminated.
if (jupyterKernelProcessLauncher.isRunning() && !jupyterKernelClient.isMaybeKernelFailed()) {
return new InterpreterResult(
InterpreterResult.Code.valueOf(response.getStatus().name()));
} else {
if (jupyterKernelClient.isMaybeKernelFailed()) {
Thread.sleep(1000);
}
if (jupyterKernelProcessLauncher.isRunning()) {
return new InterpreterResult(
InterpreterResult.Code.valueOf(response.getStatus().name()));
} else {
return new InterpreterResult(InterpreterResult.Code.ERROR,
"IPython kernel is abnormally exited, please check your code and log.");
}
}
} catch (Exception e) {
throw new InterpreterException("Fail to interpret python code", e);
}
}
@Override
public void cancel(InterpreterContext context) throws InterpreterException {
jupyterKernelClient.cancel(CancelRequest.newBuilder().build());
}
@Override
public FormType getFormType() {
return FormType.SIMPLE;
}
@Override
public int getProgress(InterpreterContext context) throws InterpreterException {
return 0;
}
@Override
public List<InterpreterCompletion> completion(String buf, int cursor,
InterpreterContext interpreterContext) {
LOGGER.debug("Call completion for: {}, cursor: {}", buf, cursor);
List<InterpreterCompletion> completions = new ArrayList<>();
CompletionResponse response =
jupyterKernelClient.complete(
CompletionRequest.getDefaultInstance().newBuilder().setCode(buf)
.setCursor(cursor).build());
for (int i = 0; i < response.getMatchesCount(); i++) {
String match = response.getMatches(i);
int lastIndexOfDot = match.lastIndexOf(".");
if (lastIndexOfDot != -1) {
match = match.substring(lastIndexOfDot + 1);
}
LOGGER.debug("Candidate completion: {}", match);
completions.add(new InterpreterCompletion(match, match, ""));
}
return completions;
}
@Override
public ZeppelinContext getZeppelinContext() {
return z;
}
public class JupyterKernelProcessLauncher extends ProcessLauncher {
JupyterKernelProcessLauncher(CommandLine commandLine,
Map<String, String> envs) {
super(commandLine, envs);
}
@Override
public void waitForReady(int timeout) {
// wait until jupyter kernel is started or timeout
long startTime = System.currentTimeMillis();
while (state == State.LAUNCHED) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER.error("Interrupted by something", e);
}
try {
StatusResponse response = jupyterKernelClient.status(StatusRequest.newBuilder().build());
if (response.getStatus() == KernelStatus.RUNNING) {
LOGGER.info("Jupyter Kernel is Running");
onProcessRunning();
break;
} else {
LOGGER.info("Wait for Jupyter Kernel to be started");
}
} catch (Exception e) {
// ignore the exception, because is may happen when grpc server has not started yet.
LOGGER.info("Wait for Jupyter Kernel to be started");
}
if ((System.currentTimeMillis() - startTime) > timeout) {
onTimeout();
break;
}
}
}
}
}