blob: 8e881f6376466cf11c7afd8265a19f431c1ca5ea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.python;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.interpreter.ZeppelinContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteRequest;
import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteResponse;
import org.apache.zeppelin.interpreter.jupyter.proto.ExecuteStatus;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.apache.zeppelin.jupyter.JupyterKernelInterpreter;
import org.apache.zeppelin.jupyter.PythonPackagePredicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import py4j.GatewayServer;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* IPython Interpreter for Zeppelin. It enhances the JupyterKernelInterpreter by setting up
* communication between JVM and Python process via py4j. So that in IPythonInterpreter
* you can use ZeppelinContext.
*/
public class IPythonInterpreter extends JupyterKernelInterpreter {
private static final Logger LOGGER = LoggerFactory.getLogger(IPythonInterpreter.class);
// GatewayServer in jvm side to communicate with python kernel process.
private GatewayServer gatewayServer;
// allow to set PYTHONPATH
private String additionalPythonPath;
private String additionalPythonInitFile;
private boolean useBuiltinPy4j = true;
private boolean usePy4JAuth = true;
private String py4jGatewaySecret;
public IPythonInterpreter(Properties properties) {
super("python", properties);
}
@Override
public String getKernelName() {
return "python";
}
@Override
public List<PythonPackagePredicate<String>> getRequiredPackagesPredicates() {
List<PythonPackagePredicate<String>> requiredPackages = super.getRequiredPackagesPredicates();
requiredPackages.add(
new PythonPackagePredicate<>("ipython",
packages -> packages.contains("ipython ") ||
packages.contains("ipython=")));
requiredPackages.add(
new PythonPackagePredicate<>("ipykernel",
packages -> packages.contains("ipykernel ") ||
packages.contains("ipykernel=")));
return requiredPackages;
}
/**
* Sub class can customize the interpreter by adding more python packages under PYTHONPATH.
* e.g. IPySparkInterpreter
*
* @param additionalPythonPath
*/
public void setAdditionalPythonPath(String additionalPythonPath) {
this.additionalPythonPath = additionalPythonPath;
}
/**
* Sub class can customize the interpreter by running additional python init code.
* e.g. IPySparkInterpreter
*
* @param additionalPythonInitFile
*/
public void setAdditionalPythonInitFile(String additionalPythonInitFile) {
this.additionalPythonInitFile = additionalPythonInitFile;
}
public void setUseBuiltinPy4j(boolean useBuiltinPy4j) {
this.useBuiltinPy4j = useBuiltinPy4j;
}
@Override
public ZeppelinContext buildZeppelinContext() {
return new PythonZeppelinContext(
getInterpreterGroup().getInterpreterHookRegistry(),
Integer.parseInt(getProperty("zeppelin.python.maxResult", "1000")));
}
@Override
public void open() throws InterpreterException {
super.open();
try {
String gatewayHost = PythonUtils.getLocalIP(properties);
int gatewayPort = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
setupJVMGateway(gatewayHost, gatewayPort);
initPythonInterpreter(gatewayHost, gatewayPort);
} catch (Exception e) {
LOGGER.error("Fail to open IPythonInterpreter", e);
throw new InterpreterException(e);
}
}
private void setupJVMGateway(String gatewayHost, int gatewayPort) throws IOException {
this.gatewayServer = PythonUtils.createGatewayServer(this, gatewayHost,
gatewayPort, py4jGatewaySecret, usePy4JAuth);
gatewayServer.start();
}
private void initPythonInterpreter(String gatewayHost, int gatewayPort) throws IOException {
InputStream input =
getClass().getClassLoader().getResourceAsStream("python/zeppelin_ipython.py");
List<String> lines = IOUtils.readLines(input, StandardCharsets.UTF_8);
ExecuteResponse response = jupyterKernelClient.block_execute(ExecuteRequest.newBuilder()
.setCode(StringUtils.join(lines, System.lineSeparator())
.replace("${JVM_GATEWAY_PORT}", gatewayPort + "")
.replace("${JVM_GATEWAY_ADDRESS}", gatewayHost)).build());
if (response.getStatus() != ExecuteStatus.SUCCESS) {
throw new IOException("Fail to setup JVMGateway\n" + response.getOutput());
}
input =
getClass().getClassLoader().getResourceAsStream("python/zeppelin_context.py");
lines = IOUtils.readLines(input, StandardCharsets.UTF_8);
response = jupyterKernelClient.block_execute(ExecuteRequest.newBuilder()
.setCode(StringUtils.join(lines, System.lineSeparator())).build());
if (response.getStatus() != ExecuteStatus.SUCCESS) {
throw new IOException("Fail to import ZeppelinContext\n" + response.getOutput());
}
response = jupyterKernelClient.block_execute(ExecuteRequest.newBuilder()
.setCode("z = __zeppelin__ = PyZeppelinContext(intp.getZeppelinContext(), gateway)")
.build());
if (response.getStatus() != ExecuteStatus.SUCCESS) {
throw new IOException("Fail to setup ZeppelinContext\n" + response.getOutput());
}
if (additionalPythonInitFile != null) {
input = getClass().getClassLoader().getResourceAsStream(additionalPythonInitFile);
lines = IOUtils.readLines(input, StandardCharsets.UTF_8);
response = jupyterKernelClient.block_execute(ExecuteRequest.newBuilder()
.setCode(StringUtils.join(lines, System.lineSeparator())
.replace("${JVM_GATEWAY_PORT}", gatewayPort + "")
.replace("${JVM_GATEWAY_ADDRESS}", gatewayHost)).build());
if (response.getStatus() != ExecuteStatus.SUCCESS) {
LOGGER.error("Fail to run additional Python init file\n{}", response.getOutput());
throw new IOException("Fail to run additional Python init file: "
+ additionalPythonInitFile + "\n" + response.getOutput());
}
}
}
@Override
protected Map<String, String> setupKernelEnv() throws IOException {
Map<String, String> envs = super.setupKernelEnv();
if (useBuiltinPy4j) {
//TODO(zjffdu) don't do hard code on py4j here
File py4jDestFile = new File(kernelWorkDir, "py4j-src-0.10.7.zip");
FileUtils.copyURLToFile(getClass().getClassLoader().getResource(
"python/py4j-src-0.10.7.zip"), py4jDestFile);
if (additionalPythonPath != null) {
// put the py4j at the end, because additionalPythonPath may already contain py4j.
// e.g. IPySparkInterpreter
additionalPythonPath = additionalPythonPath + ":" + py4jDestFile.getAbsolutePath();
} else {
additionalPythonPath = py4jDestFile.getAbsolutePath();
}
}
if (envs.containsKey("PYTHONPATH")) {
if (additionalPythonPath != null) {
envs.put("PYTHONPATH", additionalPythonPath + ":" + envs.get("PYTHONPATH"));
}
} else {
envs.put("PYTHONPATH", additionalPythonPath);
}
this.usePy4JAuth = Boolean.parseBoolean(getProperty("zeppelin.py4j.useAuth", "true"));
this.py4jGatewaySecret = PythonUtils.createSecret(256);
if (usePy4JAuth) {
envs.put("PY4J_GATEWAY_SECRET", this.py4jGatewaySecret);
}
LOGGER.info("PYTHONPATH: {}", envs.get("PYTHONPATH"));
return envs;
}
@Override
public void close() throws InterpreterException {
super.close();
if (gatewayServer != null) {
LOGGER.info("Shutdown Py4j GatewayServer");
gatewayServer.shutdown();
gatewayServer = null;
}
}
}