blob: 297024af86be33617e3c6878b50ef53edc16bd04 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.remote;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.thrift.TException;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.interpreter.ConfInterpreter;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.LifecycleManager;
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.RemoteScheduler;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
/**
* Proxy for Interpreter instance that runs on separate process
*/
public class RemoteInterpreter extends Interpreter {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreter.class);
private static final Gson gson = new Gson();
private String className;
private String sessionId;
private FormType formType;
private RemoteInterpreterProcess interpreterProcess;
private volatile boolean isOpened = false;
private volatile boolean isCreated = false;
private LifecycleManager lifecycleManager;
/**
* Remote interpreter and manage interpreter process
*/
public RemoteInterpreter(Properties properties,
String sessionId,
String className,
String userName,
LifecycleManager lifecycleManager) {
super(properties);
this.sessionId = sessionId;
this.className = className;
this.setUserName(userName);
this.lifecycleManager = lifecycleManager;
}
public boolean isOpened() {
return isOpened;
}
@VisibleForTesting
public void setOpened(boolean opened) {
isOpened = opened;
}
@Override
public String getClassName() {
return className;
}
public String getSessionId() {
return this.sessionId;
}
public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() throws IOException {
if (this.interpreterProcess != null) {
return this.interpreterProcess;
}
ManagedInterpreterGroup intpGroup = getInterpreterGroup();
this.interpreterProcess = intpGroup.getOrCreateInterpreterProcess(getUserName(), properties);
return interpreterProcess;
}
public ManagedInterpreterGroup getInterpreterGroup() {
return (ManagedInterpreterGroup) super.getInterpreterGroup();
}
@Override
public void open() throws InterpreterException {
synchronized (this) {
if (!isOpened) {
// create all the interpreters of the same session first, then Open the internal interpreter
// of this RemoteInterpreter.
// The why we we create all the interpreter of the session is because some interpreter
// depends on other interpreter. e.g. PySparkInterpreter depends on SparkInterpreter.
// also see method Interpreter.getInterpreterInTheSameSessionByClassName
for (Interpreter interpreter : getInterpreterGroup()
.getOrCreateSession(this.getUserName(), sessionId)) {
try {
if (!(interpreter instanceof ConfInterpreter)) {
((RemoteInterpreter) interpreter).internal_create();
}
} catch (IOException e) {
throw new InterpreterException(e);
}
}
interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
@Override
public Void call(Client client) throws Exception {
LOGGER.info("Open RemoteInterpreter {}", getClassName());
// open interpreter here instead of in the jobRun method in RemoteInterpreterServer
// client.open(sessionId, className);
// Push angular object loaded from JSON file to remote interpreter
synchronized (getInterpreterGroup()) {
if (!getInterpreterGroup().isAngularRegistryPushed()) {
pushAngularObjectRegistryToRemote(client);
getInterpreterGroup().setAngularRegistryPushed(true);
}
}
return null;
}
});
isOpened = true;
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
}
}
}
private void internal_create() throws IOException {
synchronized (this) {
if (!isCreated) {
this.interpreterProcess = getOrCreateInterpreterProcess();
if (!interpreterProcess.isRunning()) {
throw new IOException("Interpreter process is not running:\n" +
interpreterProcess.getErrorMessage());
}
interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
@Override
public Void call(Client client) throws Exception {
LOGGER.info("Create RemoteInterpreter {}", getClassName());
client.createInterpreter(getInterpreterGroup().getId(), sessionId,
className, (Map) getProperties(), getUserName());
return null;
}
});
isCreated = true;
}
}
}
@Override
public void close() throws InterpreterException {
if (isOpened) {
RemoteInterpreterProcess interpreterProcess = null;
try {
interpreterProcess = getOrCreateInterpreterProcess();
} catch (IOException e) {
throw new InterpreterException(e);
}
interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
@Override
public Void call(Client client) throws Exception {
client.close(sessionId, className);
return null;
}
});
isOpened = false;
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
} else {
LOGGER.warn("close is called when RemoterInterpreter is not opened for " + className);
}
}
@Override
public InterpreterResult interpret(final String st, final InterpreterContext context)
throws InterpreterException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("st:\n{}", st);
}
final FormType form = getFormType();
RemoteInterpreterProcess interpreterProcess = null;
try {
interpreterProcess = getOrCreateInterpreterProcess();
} catch (IOException e) {
throw new InterpreterException(e);
}
if (!interpreterProcess.isRunning()) {
throw new InterpreterException("Interpreter process is not running:\n" +
interpreterProcess.getErrorMessage());
}
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
return interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<InterpreterResult>() {
@Override
public InterpreterResult call(Client client) throws Exception {
RemoteInterpreterResult remoteResult = client.interpret(
sessionId, className, st, convert(context));
Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
}.getType());
context.getConfig().clear();
if (remoteConfig != null) {
context.getConfig().putAll(remoteConfig);
}
GUI currentGUI = context.getGui();
GUI currentNoteGUI = context.getNoteGui();
if (form == FormType.NATIVE) {
GUI remoteGui = GUI.fromJson(remoteResult.getGui());
GUI remoteNoteGui = GUI.fromJson(remoteResult.getNoteGui());
currentGUI.clear();
currentGUI.setParams(remoteGui.getParams());
currentGUI.setForms(remoteGui.getForms());
currentNoteGUI.setParams(remoteNoteGui.getParams());
currentNoteGUI.setForms(remoteNoteGui.getForms());
} else if (form == FormType.SIMPLE) {
final Map<String, Input> currentForms = currentGUI.getForms();
final Map<String, Object> currentParams = currentGUI.getParams();
final GUI remoteGUI = GUI.fromJson(remoteResult.getGui());
final Map<String, Input> remoteForms = remoteGUI.getForms();
final Map<String, Object> remoteParams = remoteGUI.getParams();
currentForms.putAll(remoteForms);
currentParams.putAll(remoteParams);
}
InterpreterResult result = convert(remoteResult);
return result;
}
}
);
}
@Override
public void cancel(final InterpreterContext context) throws InterpreterException {
if (!isOpened) {
LOGGER.warn("Cancel is called when RemoterInterpreter is not opened for " + className);
return;
}
RemoteInterpreterProcess interpreterProcess = null;
try {
interpreterProcess = getOrCreateInterpreterProcess();
} catch (IOException e) {
throw new InterpreterException(e);
}
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() {
@Override
public Void call(Client client) throws Exception {
client.cancel(sessionId, className, convert(context));
return null;
}
});
}
@Override
public FormType getFormType() throws InterpreterException {
if (formType != null) {
return formType;
}
// it is possible to call getFormType before it is opened
synchronized (this) {
if (!isOpened) {
open();
}
}
RemoteInterpreterProcess interpreterProcess = null;
try {
interpreterProcess = getOrCreateInterpreterProcess();
} catch (IOException e) {
throw new InterpreterException(e);
}
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
FormType type = interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<FormType>() {
@Override
public FormType call(Client client) throws Exception {
formType = FormType.valueOf(client.getFormType(sessionId, className));
return formType;
}
});
return type;
}
@Override
public int getProgress(final InterpreterContext context) throws InterpreterException {
if (!isOpened) {
LOGGER.warn("getProgress is called when RemoterInterpreter is not opened for " + className);
return 0;
}
RemoteInterpreterProcess interpreterProcess = null;
try {
interpreterProcess = getOrCreateInterpreterProcess();
} catch (IOException e) {
throw new InterpreterException(e);
}
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
return interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<Integer>() {
@Override
public Integer call(Client client) throws Exception {
return client.getProgress(sessionId, className, convert(context));
}
});
}
@Override
public List<InterpreterCompletion> completion(final String buf, final int cursor,
final InterpreterContext interpreterContext)
throws InterpreterException {
if (!isOpened) {
open();
}
RemoteInterpreterProcess interpreterProcess = null;
try {
interpreterProcess = getOrCreateInterpreterProcess();
} catch (IOException e) {
throw new InterpreterException(e);
}
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
return interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<List<InterpreterCompletion>>() {
@Override
public List<InterpreterCompletion> call(Client client) throws Exception {
return client.completion(sessionId, className, buf, cursor,
convert(interpreterContext));
}
});
}
public String getStatus(final String jobId) {
if (!isOpened) {
LOGGER.warn("getStatus is called when RemoteInterpreter is not opened for " + className);
return Job.Status.UNKNOWN.name();
}
RemoteInterpreterProcess interpreterProcess = null;
try {
interpreterProcess = getOrCreateInterpreterProcess();
} catch (IOException e) {
throw new RuntimeException(e);
}
this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId);
return interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<String>() {
@Override
public String call(Client client) throws Exception {
return client.getStatus(sessionId, jobId);
}
});
}
@Override
public Scheduler getScheduler() {
// one session own one Scheduler, so that when one session is closed, all the jobs/paragraphs
// running under the scheduler of this session will be aborted.
Scheduler s = new RemoteScheduler(
RemoteInterpreter.class.getSimpleName() + getInterpreterGroup().getId(),
SchedulerFactory.singleton().getExecutor(),
this);
return SchedulerFactory.singleton().createOrGetScheduler(s);
}
private RemoteInterpreterContext convert(InterpreterContext ic) {
return new RemoteInterpreterContext(ic.getNoteId(), ic.getNoteName(), ic.getParagraphId(),
ic.getReplName(), ic.getParagraphTitle(), ic.getParagraphText(),
gson.toJson(ic.getAuthenticationInfo()), gson.toJson(ic.getConfig()), ic.getGui().toJson(),
gson.toJson(ic.getNoteGui()),
ic.getLocalProperties());
}
private InterpreterResult convert(RemoteInterpreterResult result) {
InterpreterResult r = new InterpreterResult(
InterpreterResult.Code.valueOf(result.getCode()));
for (RemoteInterpreterResultMessage m : result.getMsg()) {
r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData());
}
return r;
}
/**
* Push local angular object registry to
* remote interpreter. This method should be
* call ONLY once when the first Interpreter is created
*/
private void pushAngularObjectRegistryToRemote(Client client) throws TException {
final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup()
.getAngularObjectRegistry();
if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) {
final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry
.getRegistry();
LOGGER.info("Push local angular object registry from ZeppelinServer to" +
" remote interpreter group {}", this.getInterpreterGroup().getId());
final java.lang.reflect.Type registryType = new TypeToken<Map<String,
Map<String, AngularObject>>>() {
}.getType();
client.angularRegistryPush(gson.toJson(registry, registryType));
}
}
@Override
public String toString() {
return "RemoteInterpreter_" + className + "_" + sessionId;
}
}