blob: a9c86c4ef70f0bb2ae59089ad418a94be51fd5c1 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.zeppelin.interpreter.remote;
import java.util.*;
import org.apache.thrift.TException;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Proxy for Interpreter instance that runs on separate process
public class RemoteInterpreter extends Interpreter {
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);
private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
private final ApplicationEventListener applicationEventListener;
private Gson gson = new Gson();
private String interpreterRunner;
private String interpreterPath;
private String localRepoPath;
private String className;
private String sessionKey;
private FormType formType;
private boolean initialized;
private Map<String, String> env;
private int connectTimeout;
private int maxPoolSize;
private String host;
private int port;
private String userName;
private Boolean isUserImpersonate;
private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;
* Remote interpreter and manage interpreter process
public RemoteInterpreter(Properties property, String sessionKey, String className,
String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout,
int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener,
ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
int outputLimit) {
this.sessionKey = sessionKey;
this.className = className;
initialized = false;
this.interpreterRunner = interpreterRunner;
this.interpreterPath = interpreterPath;
this.localRepoPath = localRepoPath;
env = getEnvFromInterpreterProperty(property);
this.connectTimeout = connectTimeout;
this.maxPoolSize = maxPoolSize;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
this.applicationEventListener = appListener;
this.userName = userName;
this.isUserImpersonate = isUserImpersonate;
this.outputLimit = outputLimit;
* Connect to existing process
public RemoteInterpreter(Properties property, String sessionKey, String className, String host,
int port, String localRepoPath, int connectTimeout, int maxPoolSize,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
int outputLimit) {
this.sessionKey = sessionKey;
this.className = className;
initialized = false; = host;
this.port = port;
this.localRepoPath = localRepoPath;
this.connectTimeout = connectTimeout;
this.maxPoolSize = maxPoolSize;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
this.applicationEventListener = appListener;
this.userName = userName;
this.isUserImpersonate = isUserImpersonate;
this.outputLimit = outputLimit;
// VisibleForTesting
public RemoteInterpreter(Properties property, String sessionKey, String className,
String interpreterRunner, String interpreterPath, String localRepoPath,
Map<String, String> env, int connectTimeout,
RemoteInterpreterProcessListener remoteInterpreterProcessListener,
ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
this.className = className;
this.sessionKey = sessionKey;
this.interpreterRunner = interpreterRunner;
this.interpreterPath = interpreterPath;
this.localRepoPath = localRepoPath;
this.env = env;
this.connectTimeout = connectTimeout;
this.maxPoolSize = 10;
this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
this.applicationEventListener = appListener;
this.userName = userName;
this.isUserImpersonate = isUserImpersonate;
private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
Map<String, String> env = new HashMap<>();
for (Object key : property.keySet()) {
if (isEnvString((String) key)) {
env.put((String) key, property.getProperty((String) key));
return env;
static boolean isEnvString(String key) {
if (key == null || key.length() == 0) {
return false;
return key.matches("^[A-Z_0-9]*");
public String getClassName() {
return className;
private boolean connectToExistingProcess() {
return host != null && port > 0;
public RemoteInterpreterProcess getInterpreterProcess() {
InterpreterGroup intpGroup = getInterpreterGroup();
if (intpGroup == null) {
return null;
synchronized (intpGroup) {
if (intpGroup.getRemoteInterpreterProcess() == null) {
RemoteInterpreterProcess remoteProcess;
if (connectToExistingProcess()) {
remoteProcess = new RemoteInterpreterRunningProcess(
} else {
// create new remote process
remoteProcess = new RemoteInterpreterManagedProcess(
interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout,
remoteInterpreterProcessListener, applicationEventListener);
return intpGroup.getRemoteInterpreterProcess();
public synchronized void init() {
if (initialized == true) {
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
final InterpreterGroup interpreterGroup = getInterpreterGroup();
Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
String groupId = interpreterGroup.getId();
synchronized (interpreterProcess) {
Client client = null;
try {
client = interpreterProcess.getClient();
} catch (Exception e1) {
throw new InterpreterException(e1);
boolean broken = false;
try {"Create remote interpreter {}", getClassName());
if (localRepoPath != null) {
property.put("zeppelin.interpreter.localRepo", localRepoPath);
property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit));
client.createInterpreter(groupId, sessionKey,
getClassName(), (Map) property, userName);
// Push angular object loaded from JSON file to remote interpreter
if (!interpreterGroup.isAngularRegistryPushed()) {
} catch (TException e) {
logger.error("Failed to create interpreter: {}", getClassName());
throw new InterpreterException(e);
} finally {
// TODO(jongyoul): Fixed it when not all of interpreter in same interpreter group are broken
interpreterProcess.releaseClient(client, broken);
initialized = true;
public void open() {
InterpreterGroup interpreterGroup = getInterpreterGroup();
synchronized (interpreterGroup) {
// initialize all interpreters in this interpreter group
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
// TODO(jl): this open method is called by It, however,
// initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
// doesn't call open method, it's not open. It causes problem while running intp.close()
// In case of Spark, this method initializes all of interpreters and init() method increases
// reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
// other interpreters doesn't do anything because those LazyInterpreters aren't open.
// But for now, we have to initialise all of interpreters for some reasons.
// See Interpreter.getInterpreterInTheSameSessionByClassName(String)
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
if (!initialized) {
// reference per session
interpreterProcess.reference(interpreterGroup, userName, isUserImpersonate);
for (Interpreter intp : new ArrayList<>(interpreters)) {
Interpreter p = intp;
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
try {
((RemoteInterpreter) p).init();
} catch (InterpreterException e) {
logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
public void close() {
InterpreterGroup interpreterGroup = getInterpreterGroup();
synchronized (interpreterGroup) {
// close all interpreters in this session
List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
// TODO(jl): this open method is called by It, however,
// initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
// doesn't call open method, it's not open. It causes problem while running intp.close()
// In case of Spark, this method initializes all of interpreters and init() method increases
// reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
// other interpreters doesn't do anything because those LazyInterpreters aren't open.
// But for now, we have to initialise all of interpreters for some reasons.
// See Interpreter.getInterpreterInTheSameSessionByClassName(String)
if (initialized) {
// dereference per session
for (Interpreter intp : new ArrayList<>(interpreters)) {
Interpreter p = intp;
while (p instanceof WrappedInterpreter) {
p = ((WrappedInterpreter) p).getInnerInterpreter();
try {
((RemoteInterpreter) p).closeInterpreter();
} catch (InterpreterException e) {
logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
public void closeInterpreter() {
if (this.initialized == false) {
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
Client client = null;
boolean broken = false;
try {
client = interpreterProcess.getClient();
if (client != null) {
client.close(sessionKey, className);
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} catch (Exception e1) {
throw new InterpreterException(e1);
} finally {
if (client != null) {
interpreterProcess.releaseClient(client, broken);
this.initialized = false;
public InterpreterResult interpret(String st, InterpreterContext context) {
if (logger.isDebugEnabled()) {
logger.debug("st:\n{}", st);
FormType form = getFormType();
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
Client client = null;
try {
client = interpreterProcess.getClient();
} catch (Exception e1) {
throw new InterpreterException(e1);
InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess
List<InterpreterContextRunner> runners = context.getRunners();
if (runners != null && runners.size() != 0) {
// assume all runners in this InterpreterContext have the same note id
String noteId = runners.get(0).getNoteId();
interpreterContextRunnerPool.addAll(noteId, runners);
boolean broken = false;
try {
final GUI currentGUI = context.getGui();
RemoteInterpreterResult remoteResult = client.interpret(
sessionKey, className, st, convert(context));
Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
if (form == FormType.NATIVE) {
GUI remoteGui = gson.fromJson(remoteResult.getGui(), GUI.class);
} else if (form == FormType.SIMPLE) {
final Map<String, Input> currentForms = currentGUI.getForms();
final Map<String, Object> currentParams = currentGUI.getParams();
final GUI remoteGUI = gson.fromJson(remoteResult.getGui(), GUI.class);
final Map<String, Input> remoteForms = remoteGUI.getForms();
final Map<String, Object> remoteParams = remoteGUI.getParams();
InterpreterResult result = convert(remoteResult);
return result;
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} finally {
interpreterProcess.releaseClient(client, broken);
public void cancel(InterpreterContext context) {
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
Client client = null;
try {
client = interpreterProcess.getClient();
} catch (Exception e1) {
throw new InterpreterException(e1);
boolean broken = false;
try {
client.cancel(sessionKey, className, convert(context));
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} finally {
interpreterProcess.releaseClient(client, broken);
public FormType getFormType() {
if (formType != null) {
return formType;
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
Client client = null;
try {
client = interpreterProcess.getClient();
} catch (Exception e1) {
throw new InterpreterException(e1);
boolean broken = false;
try {
formType = FormType.valueOf(client.getFormType(sessionKey, className));
return formType;
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} finally {
interpreterProcess.releaseClient(client, broken);
public int getProgress(InterpreterContext context) {
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
if (interpreterProcess == null || !interpreterProcess.isRunning()) {
return 0;
Client client = null;
try {
client = interpreterProcess.getClient();
} catch (Exception e1) {
throw new InterpreterException(e1);
boolean broken = false;
try {
return client.getProgress(sessionKey, className, convert(context));
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} finally {
interpreterProcess.releaseClient(client, broken);
public List<InterpreterCompletion> completion(String buf, int cursor) {
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
Client client = null;
try {
client = interpreterProcess.getClient();
} catch (Exception e1) {
throw new InterpreterException(e1);
boolean broken = false;
try {
List completion = client.completion(sessionKey, className, buf, cursor);
return completion;
} catch (TException e) {
broken = true;
throw new InterpreterException(e);
} finally {
interpreterProcess.releaseClient(client, broken);
public Scheduler getScheduler() {
int maxConcurrency = maxPoolSize;
RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
if (interpreterProcess == null) {
return null;
} else {
return SchedulerFactory.singleton().createOrGetRemoteScheduler(
RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(),
sessionKey, interpreterProcess, maxConcurrency);
private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) {
return interpreterGroup.getId();
private RemoteInterpreterContext convert(InterpreterContext ic) {
return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(),
ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getAuthenticationInfo()),
gson.toJson(ic.getConfig()), gson.toJson(ic.getGui()), gson.toJson(ic.getRunners()));
private InterpreterResult convert(RemoteInterpreterResult result) {
InterpreterResult r = new InterpreterResult(
for (RemoteInterpreterResultMessage m : result.getMsg()) {
r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData());
return r;
* Push local angular object registry to
* remote interpreter. This method should be
* call ONLY inside the init() method
void pushAngularObjectRegistryToRemote(Client client) throws TException {
final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup()
if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) {
final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry
.getRegistry();"Push local angular object registry from ZeppelinServer to" +
" remote interpreter group {}", this.getInterpreterGroup().getId());
final java.lang.reflect.Type registryType = new TypeToken<Map<String,
Map<String, AngularObject>>>() {
Gson gson = new Gson();
client.angularRegistryPush(gson.toJson(registry, registryType));
public Map<String, String> getEnv() {
return env;
public void setEnv(Map<String, String> env) {
this.env = env;
public void addEnv(Map<String, String> env) {
if (this.env == null) {
this.env = new HashMap<>();
//Only for test
public String getInterpreterRunner() {
return interpreterRunner;