blob: abda81e03c40b4534b8d2264caec70632fb44f0b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter.remote;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.thrift.TException;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.interpreter.InterpreterContextRunner;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.ManagedInterpreterGroup;
import org.apache.zeppelin.interpreter.RemoteZeppelinServerResource;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.apache.zeppelin.interpreter.thrift.ZeppelinServerResourceParagraphRunner;
import org.apache.zeppelin.resource.Resource;
import org.apache.zeppelin.resource.ResourceId;
import org.apache.zeppelin.resource.ResourcePool;
import org.apache.zeppelin.resource.ResourceSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* Processes message from RemoteInterpreter process
*/
public class RemoteInterpreterEventPoller extends Thread {
private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class);
private final ScheduledExecutorService appendService =
Executors.newSingleThreadScheduledExecutor();
private final RemoteInterpreterProcessListener listener;
private final ApplicationEventListener appListener;
private volatile boolean shutdown;
private RemoteInterpreterProcess interpreterProcess;
private ManagedInterpreterGroup interpreterGroup;
Gson gson = new Gson();
public RemoteInterpreterEventPoller(
RemoteInterpreterProcessListener listener,
ApplicationEventListener appListener) {
this.listener = listener;
this.appListener = appListener;
shutdown = false;
}
public void setInterpreterProcess(RemoteInterpreterProcess interpreterProcess) {
this.interpreterProcess = interpreterProcess;
}
public void setInterpreterGroup(ManagedInterpreterGroup interpreterGroup) {
this.interpreterGroup = interpreterGroup;
}
@Override
public void run() {
AppendOutputRunner runner = new AppendOutputRunner(listener);
ScheduledFuture<?> appendFuture = appendService.scheduleWithFixedDelay(
runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS);
while (!shutdown) {
// wait and retry
if (!interpreterProcess.isRunning()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// nothing to do
}
continue;
}
RemoteInterpreterEvent event = interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<RemoteInterpreterEvent>() {
@Override
public RemoteInterpreterEvent call(Client client) throws Exception {
return client.getEvent();
}
}
);
AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry();
try {
if (event.getType() != RemoteInterpreterEventType.NO_OP) {
logger.debug("Receive message from RemoteInterpreter Process: " + event.toString());
}
if (event.getType() == RemoteInterpreterEventType.NO_OP) {
continue;
} else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_ADD) {
AngularObject angularObject = AngularObject.fromJson(event.getData());
angularObjectRegistry.add(angularObject.getName(),
angularObject.get(), angularObject.getNoteId(), angularObject.getParagraphId());
} else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE) {
AngularObject angularObject = AngularObject.fromJson(event.getData());
AngularObject localAngularObject = angularObjectRegistry.get(
angularObject.getName(), angularObject.getNoteId(), angularObject.getParagraphId());
if (localAngularObject instanceof RemoteAngularObject) {
// to avoid ping-pong loop
((RemoteAngularObject) localAngularObject).set(
angularObject.get(), true, false);
} else {
localAngularObject.set(angularObject.get());
}
} else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE) {
AngularObject angularObject = AngularObject.fromJson(event.getData());
angularObjectRegistry.remove(angularObject.getName(), angularObject.getNoteId(),
angularObject.getParagraphId());
} else if (event.getType() == RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER) {
InterpreterContextRunner runnerFromRemote = gson.fromJson(
event.getData(), RemoteInterpreterContextRunner.class);
listener.onRemoteRunParagraph(
runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId());
} else if (event.getType() == RemoteInterpreterEventType.RESOURCE_POOL_GET_ALL) {
ResourceSet resourceSet = getAllResourcePoolExcept();
sendResourcePoolResponseGetAll(resourceSet);
} else if (event.getType() == RemoteInterpreterEventType.RESOURCE_GET) {
String resourceIdString = event.getData();
ResourceId resourceId = ResourceId.fromJson(resourceIdString);
logger.debug("RESOURCE_GET {} {}", resourceId.getResourcePoolId(), resourceId.getName());
Object o = getResource(resourceId);
sendResourceResponseGet(resourceId, o);
} else if (event.getType() == RemoteInterpreterEventType.RESOURCE_INVOKE_METHOD) {
String message = event.getData();
InvokeResourceMethodEventMessage invokeMethodMessage =
InvokeResourceMethodEventMessage.fromJson(message);
Object ret = invokeResourceMethod(invokeMethodMessage);
sendInvokeMethodResult(invokeMethodMessage, ret);
} else if (event.getType() == RemoteInterpreterEventType.OUTPUT_APPEND) {
// on output append
Map<String, String> outputAppend = gson.fromJson(
event.getData(), new TypeToken<Map<String, Object>>() {}.getType());
String noteId = (String) outputAppend.get("noteId");
String paragraphId = (String) outputAppend.get("paragraphId");
int index = Integer.parseInt(outputAppend.get("index"));
String outputToAppend = (String) outputAppend.get("data");
String appId = (String) outputAppend.get("appId");
if (appId == null) {
runner.appendBuffer(noteId, paragraphId, index, outputToAppend);
} else {
appListener.onOutputAppend(noteId, paragraphId, index, appId, outputToAppend);
}
} else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE_ALL) {
Map<String, Object> outputUpdate = gson.fromJson(
event.getData(), new TypeToken<Map<String, Object>>() {}.getType());
String noteId = (String) outputUpdate.get("noteId");
String paragraphId = (String) outputUpdate.get("paragraphId");
// clear the output
List<Map<String, String>> messages =
(List<Map<String, String>>) outputUpdate.get("messages");
if (messages != null) {
listener.onOutputClear(noteId, paragraphId);
for (int i = 0; i < messages.size(); i++) {
Map<String, String> m = messages.get(i);
InterpreterResult.Type type =
InterpreterResult.Type.valueOf((String) m.get("type"));
String outputToUpdate = (String) m.get("data");
listener.onOutputUpdated(noteId, paragraphId, i, type, outputToUpdate);
}
}
} else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE) {
// on output update
Map<String, String> outputAppend = gson.fromJson(
event.getData(), new TypeToken<Map<String, Object>>() {}.getType());
String noteId = (String) outputAppend.get("noteId");
String paragraphId = (String) outputAppend.get("paragraphId");
int index = Integer.parseInt(outputAppend.get("index"));
InterpreterResult.Type type =
InterpreterResult.Type.valueOf((String) outputAppend.get("type"));
String outputToUpdate = (String) outputAppend.get("data");
String appId = (String) outputAppend.get("appId");
if (appId == null) {
listener.onOutputUpdated(noteId, paragraphId, index, type, outputToUpdate);
} else {
appListener.onOutputUpdated(noteId, paragraphId, index, appId, type, outputToUpdate);
}
} else if (event.getType() == RemoteInterpreterEventType.APP_STATUS_UPDATE) {
// on output update
Map<String, String> appStatusUpdate = gson.fromJson(
event.getData(), new TypeToken<Map<String, String>>() {}.getType());
String noteId = appStatusUpdate.get("noteId");
String paragraphId = appStatusUpdate.get("paragraphId");
String appId = appStatusUpdate.get("appId");
String status = appStatusUpdate.get("status");
appListener.onStatusChange(noteId, paragraphId, appId, status);
} else if (event.getType() == RemoteInterpreterEventType.REMOTE_ZEPPELIN_SERVER_RESOURCE) {
RemoteZeppelinServerResource reqResourceBody = RemoteZeppelinServerResource.fromJson(
event.getData());
progressRemoteZeppelinControlEvent(
reqResourceBody.getResourceType(), listener, reqResourceBody);
} else if (event.getType() == RemoteInterpreterEventType.META_INFOS) {
Map<String, String> metaInfos = gson.fromJson(event.getData(),
new TypeToken<Map<String, String>>() {
}.getType());
String settingId = RemoteInterpreterUtils.
getInterpreterSettingId(interpreterGroup.getId());
listener.onMetaInfosReceived(settingId, metaInfos);
} else if (event.getType() == RemoteInterpreterEventType.PARA_INFOS) {
Map<String, String> paraInfos = gson.fromJson(event.getData(),
new TypeToken<Map<String, String>>() {
}.getType());
String noteId = paraInfos.get("noteId");
String paraId = paraInfos.get("paraId");
String settingId = RemoteInterpreterUtils.
getInterpreterSettingId(interpreterGroup.getId());
if (noteId != null && paraId != null && settingId != null) {
listener.onParaInfosReceived(noteId, paraId, settingId, paraInfos);
}
}
} catch (Exception e) {
logger.error("Can't handle event " + event, e);
}
}
try {
clearUnreadEvents(interpreterProcess.getClient());
} catch (Exception e1) {
if (shutdown) {
logger.error("Can not get RemoteInterpreterEvent because it is shutdown.");
} else {
logger.error("Can't get RemoteInterpreterEvent", e1);
}
}
if (appendFuture != null) {
appendFuture.cancel(true);
}
}
private void clearUnreadEvents(Client client) throws TException {
while (client.getEvent().getType() != RemoteInterpreterEventType.NO_OP) {}
}
private void progressRemoteZeppelinControlEvent(
RemoteZeppelinServerResource.Type resourceType,
RemoteInterpreterProcessListener remoteWorksEventListener,
RemoteZeppelinServerResource reqResourceBody) throws Exception {
boolean broken = false;
final Gson gson = new Gson();
final String eventOwnerKey = reqResourceBody.getOwnerKey();
try {
if (resourceType == RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS) {
final List<ZeppelinServerResourceParagraphRunner> remoteRunners = new LinkedList<>();
ZeppelinServerResourceParagraphRunner reqRunnerContext =
new ZeppelinServerResourceParagraphRunner();
Map<String, Object> reqResourceMap = (Map<String, Object>) reqResourceBody.getData();
String noteId = (String) reqResourceMap.get("noteId");
String paragraphId = (String) reqResourceMap.get("paragraphId");
reqRunnerContext.setNoteId(noteId);
reqRunnerContext.setParagraphId(paragraphId);
RemoteInterpreterProcessListener.RemoteWorksEventListener callBackEvent =
new RemoteInterpreterProcessListener.RemoteWorksEventListener() {
@Override
public void onFinished(Object resultObject) {
if (resultObject != null && resultObject instanceof List) {
List<InterpreterContextRunner> runnerList =
(List<InterpreterContextRunner>) resultObject;
for (InterpreterContextRunner r : runnerList) {
remoteRunners.add(
new ZeppelinServerResourceParagraphRunner(r.getNoteId(), r.getParagraphId())
);
}
final RemoteZeppelinServerResource resResource =
new RemoteZeppelinServerResource();
resResource.setOwnerKey(eventOwnerKey);
resResource.setResourceType(RemoteZeppelinServerResource.Type.PARAGRAPH_RUNNERS);
resResource.setData(remoteRunners);
interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<Void>() {
@Override
public Void call(Client client) throws Exception {
client.onReceivedZeppelinResource(resResource.toJson());
return null;
}
}
);
}
}
@Override
public void onError() {
logger.info("onGetParagraphRunners onError");
}
};
remoteWorksEventListener.onGetParagraphRunners(
reqRunnerContext.getNoteId(), reqRunnerContext.getParagraphId(), callBackEvent);
}
} catch (Exception e) {
logger.error("Can't get RemoteInterpreterEvent", e);
waitQuietly();
}
}
private void sendResourcePoolResponseGetAll(final ResourceSet resourceSet) {
interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<Void>() {
@Override
public Void call(Client client) throws Exception {
List<String> resourceList = new LinkedList<>();
for (Resource r : resourceSet) {
resourceList.add(r.toJson());
}
client.resourcePoolResponseGetAll(resourceList);
return null;
}
}
);
}
private ResourceSet getAllResourcePoolExcept() {
ResourceSet resourceSet = new ResourceSet();
for (ManagedInterpreterGroup intpGroup : interpreterGroup.getInterpreterSetting()
.getInterpreterSettingManager().getAllInterpreterGroup()) {
if (intpGroup.getId().equals(interpreterGroup.getId())) {
continue;
}
RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
if (remoteInterpreterProcess == null) {
ResourcePool localPool = intpGroup.getResourcePool();
if (localPool != null) {
resourceSet.addAll(localPool.getAll());
}
} else if (interpreterProcess.isRunning()) {
List<String> resourceList = remoteInterpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<List<String>>() {
@Override
public List<String> call(Client client) throws Exception {
return client.resourcePoolGetAll();
}
}
);
for (String res : resourceList) {
resourceSet.add(Resource.fromJson(res));
}
}
}
return resourceSet;
}
private void sendResourceResponseGet(final ResourceId resourceId, final Object o) {
interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<Void>() {
@Override
public Void call(Client client) throws Exception {
String rid = resourceId.toJson();
ByteBuffer obj;
if (o == null) {
obj = ByteBuffer.allocate(0);
} else {
obj = Resource.serializeObject(o);
}
client.resourceResponseGet(rid, obj);
return null;
}
}
);
}
private Object getResource(final ResourceId resourceId) {
ManagedInterpreterGroup intpGroup = interpreterGroup.getInterpreterSetting()
.getInterpreterSettingManager()
.getInterpreterGroupById(resourceId.getResourcePoolId());
if (intpGroup == null) {
return null;
}
RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
ByteBuffer buffer = remoteInterpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() {
@Override
public ByteBuffer call(Client client) throws Exception {
return client.resourceGet(
resourceId.getNoteId(),
resourceId.getParagraphId(),
resourceId.getName());
}
}
);
try {
Object o = Resource.deserializeObject(buffer);
return o;
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return null;
}
public void sendInvokeMethodResult(final InvokeResourceMethodEventMessage message,
final Object o) {
interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<Void>() {
@Override
public Void call(Client client) throws Exception {
String invokeMessage = message.toJson();
ByteBuffer obj;
if (o == null) {
obj = ByteBuffer.allocate(0);
} else {
obj = Resource.serializeObject(o);
}
client.resourceResponseInvokeMethod(invokeMessage, obj);
return null;
}
}
);
}
private Object invokeResourceMethod(final InvokeResourceMethodEventMessage message) {
final ResourceId resourceId = message.resourceId;
ManagedInterpreterGroup intpGroup = interpreterGroup.getInterpreterSetting()
.getInterpreterSettingManager().getInterpreterGroupById(resourceId.getResourcePoolId());
if (intpGroup == null) {
return null;
}
RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess();
if (remoteInterpreterProcess == null) {
ResourcePool localPool = intpGroup.getResourcePool();
if (localPool != null) {
Resource res = localPool.get(resourceId.getName());
if (res != null) {
try {
return res.invokeMethod(
message.methodName,
message.getParamTypes(),
message.params,
message.returnResourceName);
} catch (Exception e) {
logger.error(e.getMessage(), e);
return null;
}
} else {
// object is null. can't invoke any method
logger.error("Can't invoke method {} on null object", message.methodName);
return null;
}
} else {
logger.error("no resource pool");
return null;
}
} else if (interpreterProcess.isRunning()) {
ByteBuffer res = interpreterProcess.callRemoteFunction(
new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() {
@Override
public ByteBuffer call(Client client) throws Exception {
return client.resourceInvokeMethod(
resourceId.getNoteId(),
resourceId.getParagraphId(),
resourceId.getName(),
message.toJson());
}
}
);
try {
return Resource.deserializeObject(res);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return null;
}
return null;
}
private void waitQuietly() {
try {
synchronized (this) {
wait(1000);
}
} catch (InterruptedException ignored) {
logger.info("Error in RemoteInterpreterEventPoller while waitQuietly : ", ignored);
}
}
public void shutdown() {
shutdown = true;
synchronized (this) {
notify();
}
}
}