| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.zeppelin.interpreter.remote; |
| |
| |
| import java.io.IOException; |
| import java.lang.reflect.Constructor; |
| import java.lang.reflect.InvocationTargetException; |
| import java.net.URL; |
| import java.nio.ByteBuffer; |
| import java.util.*; |
| |
| import org.apache.thrift.TException; |
| import org.apache.thrift.server.TThreadPoolServer; |
| import org.apache.thrift.transport.TServerSocket; |
| import org.apache.thrift.transport.TTransportException; |
| import org.apache.zeppelin.display.*; |
| import org.apache.zeppelin.interpreter.*; |
| import org.apache.zeppelin.interpreter.InterpreterResult.Code; |
| import org.apache.zeppelin.interpreter.thrift.*; |
| import org.apache.zeppelin.resource.*; |
| import org.apache.zeppelin.scheduler.Job; |
| import org.apache.zeppelin.scheduler.Job.Status; |
| import org.apache.zeppelin.scheduler.JobListener; |
| import org.apache.zeppelin.scheduler.JobProgressPoller; |
| import org.apache.zeppelin.scheduler.Scheduler; |
| import org.apache.zeppelin.scheduler.SchedulerFactory; |
| import org.apache.zeppelin.user.AuthenticationInfo; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.gson.Gson; |
| import com.google.gson.reflect.TypeToken; |
| |
| /** |
| * Entry point for Interpreter process. |
| * Accepting thrift connections from ZeppelinServer. |
| */ |
| public class RemoteInterpreterServer |
| extends Thread |
| implements RemoteInterpreterService.Iface, AngularObjectRegistryListener { |
| Logger logger = LoggerFactory.getLogger(RemoteInterpreterServer.class); |
| |
| InterpreterGroup interpreterGroup; |
| AngularObjectRegistry angularObjectRegistry; |
| DistributedResourcePool resourcePool; |
| Gson gson = new Gson(); |
| |
| RemoteInterpreterService.Processor<RemoteInterpreterServer> processor; |
| RemoteInterpreterServer handler; |
| private int port; |
| private TThreadPoolServer server; |
| |
| RemoteInterpreterEventClient eventClient = new RemoteInterpreterEventClient(); |
| |
| public RemoteInterpreterServer(int port) throws TTransportException { |
| this.port = port; |
| |
| processor = new RemoteInterpreterService.Processor<RemoteInterpreterServer>(this); |
| TServerSocket serverTransport = new TServerSocket(port); |
| server = new TThreadPoolServer( |
| new TThreadPoolServer.Args(serverTransport).processor(processor)); |
| } |
| |
| @Override |
| public void run() { |
| logger.info("Starting remote interpreter server on port {}", port); |
| server.serve(); |
| } |
| |
| @Override |
| public void shutdown() throws TException { |
| if (interpreterGroup != null) { |
| interpreterGroup.close(); |
| interpreterGroup.destroy(); |
| } |
| |
| server.stop(); |
| |
| // server.stop() does not always finish server.serve() loop |
| // sometimes server.serve() is hanging even after server.stop() call. |
| // this case, need to force kill the process |
| |
| long startTime = System.currentTimeMillis(); |
| while (System.currentTimeMillis() - startTime < 2000 && server.isServing()) { |
| try { |
| Thread.sleep(300); |
| } catch (InterruptedException e) { |
| logger.info("Exception in RemoteInterpreterServer while shutdown, Thread.sleep", e); |
| } |
| } |
| |
| if (server.isServing()) { |
| System.exit(0); |
| } |
| } |
| |
| public int getPort() { |
| return port; |
| } |
| |
| public boolean isRunning() { |
| if (server == null) { |
| return false; |
| } else { |
| return server.isServing(); |
| } |
| } |
| |
| |
| public static void main(String[] args) |
| throws TTransportException, InterruptedException { |
| int port = Integer.parseInt(args[0]); |
| RemoteInterpreterServer remoteInterpreterServer = new RemoteInterpreterServer(port); |
| remoteInterpreterServer.start(); |
| remoteInterpreterServer.join(); |
| System.exit(0); |
| } |
| |
| |
| @Override |
| public void createInterpreter(String interpreterGroupId, String noteId, String |
| className, |
| Map<String, String> properties) throws TException { |
| if (interpreterGroup == null) { |
| interpreterGroup = new InterpreterGroup(interpreterGroupId); |
| angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this); |
| resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient); |
| interpreterGroup.setAngularObjectRegistry(angularObjectRegistry); |
| interpreterGroup.setResourcePool(resourcePool); |
| } |
| |
| try { |
| Class<Interpreter> replClass = (Class<Interpreter>) Object.class.forName(className); |
| Properties p = new Properties(); |
| p.putAll(properties); |
| setSystemProperty(p); |
| |
| Constructor<Interpreter> constructor = |
| replClass.getConstructor(new Class[] {Properties.class}); |
| Interpreter repl = constructor.newInstance(p); |
| |
| repl.setClassloaderUrls(new URL[]{}); |
| |
| synchronized (interpreterGroup) { |
| List<Interpreter> interpreters = interpreterGroup.get(noteId); |
| if (interpreters == null) { |
| interpreters = new LinkedList<Interpreter>(); |
| interpreterGroup.put(noteId, interpreters); |
| } |
| |
| interpreters.add(new LazyOpenInterpreter(repl)); |
| } |
| |
| logger.info("Instantiate interpreter {}", className); |
| repl.setInterpreterGroup(interpreterGroup); |
| } catch (ClassNotFoundException | NoSuchMethodException | SecurityException |
| | InstantiationException | IllegalAccessException |
| | IllegalArgumentException | InvocationTargetException e) { |
| logger.error(e.toString(), e); |
| throw new TException(e); |
| } |
| } |
| |
| private void setSystemProperty(Properties properties) { |
| for (Object key : properties.keySet()) { |
| if (!RemoteInterpreter.isEnvString((String) key)) { |
| String value = properties.getProperty((String) key); |
| if (value == null || value.isEmpty()) { |
| System.clearProperty((String) key); |
| } else { |
| System.setProperty((String) key, properties.getProperty((String) key)); |
| } |
| } |
| } |
| } |
| |
| private Interpreter getInterpreter(String noteId, String className) throws TException { |
| if (interpreterGroup == null) { |
| throw new TException( |
| new InterpreterException("Interpreter instance " + className + " not created")); |
| } |
| synchronized (interpreterGroup) { |
| List<Interpreter> interpreters = interpreterGroup.get(noteId); |
| if (interpreters == null) { |
| throw new TException( |
| new InterpreterException("Interpreter " + className + " not initialized")); |
| } |
| for (Interpreter inp : interpreters) { |
| if (inp.getClassName().equals(className)) { |
| return inp; |
| } |
| } |
| } |
| throw new TException(new InterpreterException("Interpreter instance " |
| + className + " not found")); |
| } |
| |
| @Override |
| public void open(String noteId, String className) throws TException { |
| Interpreter intp = getInterpreter(noteId, className); |
| intp.open(); |
| } |
| |
| @Override |
| public void close(String noteId, String className) throws TException { |
| synchronized (interpreterGroup) { |
| List<Interpreter> interpreters = interpreterGroup.get(noteId); |
| if (interpreters != null) { |
| Iterator<Interpreter> it = interpreters.iterator(); |
| while (it.hasNext()) { |
| Interpreter inp = it.next(); |
| if (inp.getClassName().equals(className)) { |
| inp.close(); |
| |
| Scheduler scheduler = inp.getScheduler(); |
| if (scheduler != null) { |
| SchedulerFactory.singleton().removeScheduler(scheduler.getName()); |
| } |
| |
| it.remove(); |
| break; |
| } |
| } |
| |
| if (interpreters.isEmpty()) { |
| interpreterGroup.remove(noteId); |
| } |
| } |
| } |
| } |
| |
| |
| @Override |
| public RemoteInterpreterResult interpret(String noteId, String className, String st, |
| RemoteInterpreterContext interpreterContext) throws TException { |
| logger.debug("st: {}", st); |
| Interpreter intp = getInterpreter(noteId, className); |
| InterpreterContext context = convert(interpreterContext); |
| |
| Scheduler scheduler = intp.getScheduler(); |
| InterpretJobListener jobListener = new InterpretJobListener(); |
| InterpretJob job = new InterpretJob( |
| interpreterContext.getParagraphId(), |
| "remoteInterpretJob_" + System.currentTimeMillis(), |
| jobListener, |
| JobProgressPoller.DEFAULT_INTERVAL_MSEC, |
| intp, |
| st, |
| context); |
| |
| scheduler.submit(job); |
| |
| while (!job.isTerminated()) { |
| synchronized (jobListener) { |
| try { |
| jobListener.wait(1000); |
| } catch (InterruptedException e) { |
| logger.info("Exception in RemoteInterpreterServer while interpret, jobListener.wait", e); |
| } |
| } |
| } |
| |
| InterpreterResult result; |
| if (job.getStatus() == Status.ERROR) { |
| result = new InterpreterResult(Code.ERROR, Job.getStack(job.getException())); |
| } else { |
| result = (InterpreterResult) job.getReturn(); |
| |
| // in case of job abort in PENDING status, result can be null |
| if (result == null) { |
| result = new InterpreterResult(Code.KEEP_PREVIOUS_RESULT); |
| } |
| } |
| return convert(result, |
| context.getConfig(), |
| context.getGui()); |
| } |
| |
| |
| class InterpretJobListener implements JobListener { |
| |
| @Override |
| public void onProgressUpdate(Job job, int progress) { |
| } |
| |
| @Override |
| public void beforeStatusChange(Job job, Status before, Status after) { |
| } |
| |
| @Override |
| public void afterStatusChange(Job job, Status before, Status after) { |
| synchronized (this) { |
| notifyAll(); |
| } |
| } |
| } |
| |
| class InterpretJob extends Job { |
| |
| private Interpreter interpreter; |
| private String script; |
| private InterpreterContext context; |
| private Map<String, Object> infos; |
| |
| public InterpretJob( |
| String jobId, |
| String jobName, |
| JobListener listener, |
| long progressUpdateIntervalMsec, |
| Interpreter interpreter, |
| String script, |
| InterpreterContext context) { |
| super(jobId, jobName, listener, progressUpdateIntervalMsec); |
| this.interpreter = interpreter; |
| this.script = script; |
| this.context = context; |
| } |
| |
| @Override |
| public int progress() { |
| return 0; |
| } |
| |
| @Override |
| public Map<String, Object> info() { |
| if (infos == null) { |
| infos = new HashMap<>(); |
| } |
| return infos; |
| } |
| |
| @Override |
| protected Object jobRun() throws Throwable { |
| try { |
| InterpreterContext.set(context); |
| InterpreterResult result = interpreter.interpret(script, context); |
| |
| // data from context.out is prepended to InterpreterResult if both defined |
| String message = ""; |
| |
| context.out.flush(); |
| InterpreterResult.Type outputType = context.out.getType(); |
| byte[] interpreterOutput = context.out.toByteArray(); |
| context.out.clear(); |
| |
| if (interpreterOutput != null && interpreterOutput.length > 0) { |
| message = new String(interpreterOutput); |
| } |
| |
| String interpreterResultMessage = result.message(); |
| |
| InterpreterResult combinedResult; |
| if (interpreterResultMessage != null && !interpreterResultMessage.isEmpty()) { |
| message += interpreterResultMessage; |
| combinedResult = new InterpreterResult(result.code(), result.type(), message); |
| } else { |
| combinedResult = new InterpreterResult(result.code(), outputType, message); |
| } |
| |
| // put result into resource pool |
| context.getResourcePool().put( |
| context.getNoteId(), |
| context.getParagraphId(), |
| WellKnownResourceName.ParagraphResult.toString(), |
| combinedResult); |
| return combinedResult; |
| } finally { |
| InterpreterContext.remove(); |
| } |
| } |
| |
| @Override |
| protected boolean jobAbort() { |
| return false; |
| } |
| } |
| |
| |
| @Override |
| public void cancel(String noteId, String className, RemoteInterpreterContext interpreterContext) |
| throws TException { |
| logger.info("cancel {} {}", className, interpreterContext.getParagraphId()); |
| Interpreter intp = getInterpreter(noteId, className); |
| String jobId = interpreterContext.getParagraphId(); |
| Job job = intp.getScheduler().removeFromWaitingQueue(jobId); |
| |
| if (job != null) { |
| job.setStatus(Status.ABORT); |
| } else { |
| intp.cancel(convert(interpreterContext)); |
| } |
| } |
| |
| @Override |
| public int getProgress(String noteId, String className, |
| RemoteInterpreterContext interpreterContext) |
| throws TException { |
| Interpreter intp = getInterpreter(noteId, className); |
| return intp.getProgress(convert(interpreterContext)); |
| } |
| |
| |
| @Override |
| public String getFormType(String noteId, String className) throws TException { |
| Interpreter intp = getInterpreter(noteId, className); |
| return intp.getFormType().toString(); |
| } |
| |
| @Override |
| public List<InterpreterCompletion> completion(String noteId, |
| String className, String buf, int cursor) |
| throws TException { |
| Interpreter intp = getInterpreter(noteId, className); |
| List completion = intp.completion(buf, cursor); |
| return completion; |
| } |
| |
| private InterpreterContext convert(RemoteInterpreterContext ric) { |
| List<InterpreterContextRunner> contextRunners = new LinkedList<InterpreterContextRunner>(); |
| List<InterpreterContextRunner> runners = gson.fromJson(ric.getRunners(), |
| new TypeToken<List<RemoteInterpreterContextRunner>>() { |
| }.getType()); |
| |
| for (InterpreterContextRunner r : runners) { |
| contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId())); |
| } |
| |
| return new InterpreterContext( |
| ric.getNoteId(), |
| ric.getParagraphId(), |
| ric.getParagraphTitle(), |
| ric.getParagraphText(), |
| gson.fromJson(ric.getAuthenticationInfo(), AuthenticationInfo.class), |
| (Map<String, Object>) gson.fromJson(ric.getConfig(), |
| new TypeToken<Map<String, Object>>() {}.getType()), |
| gson.fromJson(ric.getGui(), GUI.class), |
| interpreterGroup.getAngularObjectRegistry(), |
| interpreterGroup.getResourcePool(), |
| contextRunners, createInterpreterOutput(ric.getNoteId(), ric.getParagraphId())); |
| } |
| |
| |
| private InterpreterOutput createInterpreterOutput(final String noteId, final String paragraphId) { |
| return new InterpreterOutput(new InterpreterOutputListener() { |
| @Override |
| public void onAppend(InterpreterOutput out, byte[] line) { |
| logger.debug("Output Append:" + new String(line)); |
| eventClient.onInterpreterOutputAppend(noteId, paragraphId, new String(line)); |
| } |
| |
| @Override |
| public void onUpdate(InterpreterOutput out, byte[] output) { |
| logger.debug("Output Update:" + new String(output)); |
| eventClient.onInterpreterOutputUpdate(noteId, paragraphId, new String(output)); |
| } |
| }); |
| } |
| |
| |
| static class ParagraphRunner extends InterpreterContextRunner { |
| |
| private transient RemoteInterpreterServer server; |
| |
| public ParagraphRunner(RemoteInterpreterServer server, String noteId, String paragraphId) { |
| super(noteId, paragraphId); |
| this.server = server; |
| } |
| |
| @Override |
| public void run() { |
| server.eventClient.run(this); |
| } |
| } |
| |
| private RemoteInterpreterResult convert(InterpreterResult result, |
| Map<String, Object> config, GUI gui) { |
| return new RemoteInterpreterResult( |
| result.code().name(), |
| result.type().name(), |
| result.message(), |
| gson.toJson(config), |
| gson.toJson(gui)); |
| } |
| |
| @Override |
| public String getStatus(String noteId, String jobId) |
| throws TException { |
| if (interpreterGroup == null) { |
| return "Unknown"; |
| } |
| |
| synchronized (interpreterGroup) { |
| List<Interpreter> interpreters = interpreterGroup.get(noteId); |
| if (interpreters == null) { |
| return "Unknown"; |
| } |
| |
| for (Interpreter intp : interpreters) { |
| for (Job job : intp.getScheduler().getJobsRunning()) { |
| if (jobId.equals(job.getId())) { |
| return job.getStatus().name(); |
| } |
| } |
| |
| for (Job job : intp.getScheduler().getJobsWaiting()) { |
| if (jobId.equals(job.getId())) { |
| return job.getStatus().name(); |
| } |
| } |
| } |
| } |
| return "Unknown"; |
| } |
| |
| |
| |
| @Override |
| public void onAdd(String interpreterGroupId, AngularObject object) { |
| eventClient.angularObjectAdd(object); |
| } |
| |
| @Override |
| public void onUpdate(String interpreterGroupId, AngularObject object) { |
| eventClient.angularObjectUpdate(object); |
| } |
| |
| @Override |
| public void onRemove(String interpreterGroupId, String name, String noteId, String paragraphId) { |
| eventClient.angularObjectRemove(name, noteId, paragraphId); |
| } |
| |
| |
| /** |
| * Poll event from RemoteInterpreterEventPoller |
| * @return |
| * @throws TException |
| */ |
| @Override |
| public RemoteInterpreterEvent getEvent() throws TException { |
| return eventClient.pollEvent(); |
| } |
| |
| /** |
| * called when object is updated in client (web) side. |
| * @param name |
| * @param noteId noteId where the update issues |
| * @param paragraphId paragraphId where the update issues |
| * @param object |
| * @throws TException |
| */ |
| @Override |
| public void angularObjectUpdate(String name, String noteId, String paragraphId, String object) |
| throws TException { |
| AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry(); |
| // first try local objects |
| AngularObject ao = registry.get(name, noteId, paragraphId); |
| if (ao == null) { |
| logger.debug("Angular object {} not exists", name); |
| return; |
| } |
| |
| if (object == null) { |
| ao.set(null, false); |
| return; |
| } |
| |
| Object oldObject = ao.get(); |
| Object value = null; |
| if (oldObject != null) { // first try with previous object's type |
| try { |
| value = gson.fromJson(object, oldObject.getClass()); |
| ao.set(value, false); |
| return; |
| } catch (Exception e) { |
| // it's not a previous object's type. proceed to treat as a generic type |
| logger.debug(e.getMessage(), e); |
| } |
| } |
| |
| // Generic java object type for json. |
| if (value == null) { |
| try { |
| value = gson.fromJson(object, |
| new TypeToken<Map<String, Object>>() { |
| }.getType()); |
| } catch (Exception e) { |
| // it's not a generic json object, too. okay, proceed to threat as a string type |
| logger.debug(e.getMessage(), e); |
| } |
| } |
| |
| // try string object type at last |
| if (value == null) { |
| value = gson.fromJson(object, String.class); |
| } |
| |
| ao.set(value, false); |
| } |
| |
| /** |
| * When zeppelinserver initiate angular object add. |
| * Dont't need to emit event to zeppelin server |
| */ |
| @Override |
| public void angularObjectAdd(String name, String noteId, String paragraphId, String object) |
| throws TException { |
| AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry(); |
| // first try local objects |
| AngularObject ao = registry.get(name, noteId, paragraphId); |
| if (ao != null) { |
| angularObjectUpdate(name, noteId, paragraphId, object); |
| return; |
| } |
| |
| // Generic java object type for json. |
| Object value = null; |
| try { |
| value = gson.fromJson(object, |
| new TypeToken<Map<String, Object>>() { |
| }.getType()); |
| } catch (Exception e) { |
| // it's okay. proceed to treat object as a string |
| logger.debug(e.getMessage(), e); |
| } |
| |
| // try string object type at last |
| if (value == null) { |
| value = gson.fromJson(object, String.class); |
| } |
| |
| registry.add(name, value, noteId, paragraphId, false); |
| } |
| |
| @Override |
| public void angularObjectRemove(String name, String noteId, String paragraphId) throws |
| TException { |
| AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry(); |
| registry.remove(name, noteId, paragraphId, false); |
| } |
| |
| @Override |
| public void resourcePoolResponseGetAll(List<String> resources) throws TException { |
| eventClient.putResponseGetAllResources(resources); |
| } |
| |
| /** |
| * Get payload of resource from remote |
| * @param resourceId json serialized ResourceId |
| * @param object java serialized of the object |
| * @throws TException |
| */ |
| @Override |
| public void resourceResponseGet(String resourceId, ByteBuffer object) throws TException { |
| eventClient.putResponseGetResource(resourceId, object); |
| } |
| |
| @Override |
| public List<String> resourcePoolGetAll() throws TException { |
| logger.debug("Request getAll from ZeppelinServer"); |
| |
| ResourceSet resourceSet = resourcePool.getAll(false); |
| List<String> result = new LinkedList<String>(); |
| Gson gson = new Gson(); |
| |
| for (Resource r : resourceSet) { |
| result.add(gson.toJson(r)); |
| } |
| |
| return result; |
| } |
| |
| @Override |
| public boolean resourceRemove(String noteId, String paragraphId, String resourceName) |
| throws TException { |
| Resource resource = resourcePool.remove(noteId, paragraphId, resourceName); |
| return resource != null; |
| } |
| |
| @Override |
| public ByteBuffer resourceGet(String noteId, String paragraphId, String resourceName) |
| throws TException { |
| logger.debug("Request resourceGet {} from ZeppelinServer", resourceName); |
| Resource resource = resourcePool.get(noteId, paragraphId, resourceName, false); |
| |
| if (resource == null || resource.get() == null || !resource.isSerializable()) { |
| return ByteBuffer.allocate(0); |
| } else { |
| try { |
| return Resource.serializeObject(resource.get()); |
| } catch (IOException e) { |
| logger.error(e.getMessage(), e); |
| return ByteBuffer.allocate(0); |
| } |
| } |
| } |
| |
| @Override |
| public void angularRegistryPush(String registryAsString) throws TException { |
| try { |
| Map<String, Map<String, AngularObject>> deserializedRegistry = gson |
| .fromJson(registryAsString, |
| new TypeToken<Map<String, Map<String, AngularObject>>>() { }.getType()); |
| interpreterGroup.getAngularObjectRegistry().setRegistry(deserializedRegistry); |
| } catch (Exception e) { |
| logger.info("Exception in RemoteInterpreterServer while angularRegistryPush, nolock", e); |
| } |
| } |
| } |