| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.zeppelin.interpreter; |
| |
| import java.util.Collection; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Random; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| import org.apache.zeppelin.display.AngularObjectRegistry; |
| import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; |
| import org.apache.zeppelin.resource.ResourcePool; |
| import org.apache.zeppelin.scheduler.Scheduler; |
| import org.apache.zeppelin.scheduler.SchedulerFactory; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * InterpreterGroup is list of interpreters in the same interpreter group. |
| * For example spark, pyspark, sql interpreters are in the same 'spark' group |
| * and InterpreterGroup will have reference to these all interpreters. |
| * |
| * Remember, list of interpreters are dedicated to a session. Session could be shared across user |
| * or notes, so the sessionId could be user or noteId or their combination. |
| * So InterpreterGroup internally manages map of [interpreterSessionKey(noteId, user, or |
| * their combination), list of interpreters] |
| * |
| * A InterpreterGroup runs on interpreter process. |
| * And unit of interpreter instantiate, restart, bind, unbind. |
| */ |
| public class InterpreterGroup extends ConcurrentHashMap<String, List<Interpreter>> { |
| String id; |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterGroup.class); |
| |
| AngularObjectRegistry angularObjectRegistry; |
| InterpreterHookRegistry hookRegistry; |
| RemoteInterpreterProcess remoteInterpreterProcess; // attached remote interpreter process |
| ResourcePool resourcePool; |
| boolean angularRegistryPushed = false; |
| |
| // map [notebook session, Interpreters in the group], to support per note session interpreters |
| //Map<String, List<Interpreter>> interpreters = new ConcurrentHashMap<String, |
| // List<Interpreter>>(); |
| |
| private static final Map<String, InterpreterGroup> allInterpreterGroups = |
| new ConcurrentHashMap<>(); |
| |
| public static InterpreterGroup getByInterpreterGroupId(String id) { |
| return allInterpreterGroups.get(id); |
| } |
| |
| public static Collection<InterpreterGroup> getAll() { |
| return new LinkedList(allInterpreterGroups.values()); |
| } |
| |
| /** |
| * Create InterpreterGroup with given id |
| * @param id |
| */ |
| public InterpreterGroup(String id) { |
| this.id = id; |
| allInterpreterGroups.put(id, this); |
| } |
| |
| /** |
| * Create InterpreterGroup with autogenerated id |
| */ |
| public InterpreterGroup() { |
| getId(); |
| allInterpreterGroups.put(id, this); |
| } |
| |
| private static String generateId() { |
| return "InterpreterGroup_" + System.currentTimeMillis() + "_" |
| + new Random().nextInt(); |
| } |
| |
| public String getId() { |
| synchronized (this) { |
| if (id == null) { |
| id = generateId(); |
| } |
| return id; |
| } |
| } |
| |
| /** |
| * Get combined property of all interpreters in this group |
| * @return |
| */ |
| public Properties getProperty() { |
| Properties p = new Properties(); |
| |
| for (List<Interpreter> intpGroupForASession : this.values()) { |
| for (Interpreter intp : intpGroupForASession) { |
| p.putAll(intp.getProperty()); |
| } |
| // it's okay to break here while every List<Interpreters> will have the same property set |
| break; |
| } |
| return p; |
| } |
| |
| public AngularObjectRegistry getAngularObjectRegistry() { |
| return angularObjectRegistry; |
| } |
| |
| public void setAngularObjectRegistry(AngularObjectRegistry angularObjectRegistry) { |
| this.angularObjectRegistry = angularObjectRegistry; |
| } |
| |
| public InterpreterHookRegistry getInterpreterHookRegistry() { |
| return hookRegistry; |
| } |
| |
| public void setInterpreterHookRegistry(InterpreterHookRegistry hookRegistry) { |
| this.hookRegistry = hookRegistry; |
| } |
| |
| public RemoteInterpreterProcess getRemoteInterpreterProcess() { |
| return remoteInterpreterProcess; |
| } |
| |
| public void setRemoteInterpreterProcess(RemoteInterpreterProcess remoteInterpreterProcess) { |
| this.remoteInterpreterProcess = remoteInterpreterProcess; |
| } |
| |
| /** |
| * Close all interpreter instances in this group |
| */ |
| public void close() { |
| LOGGER.info("Close interpreter group " + getId()); |
| List<Interpreter> intpToClose = new LinkedList<>(); |
| for (List<Interpreter> intpGroupForSession : this.values()) { |
| intpToClose.addAll(intpGroupForSession); |
| } |
| close(intpToClose); |
| |
| // make sure remote interpreter process terminates |
| if (remoteInterpreterProcess != null) { |
| while (remoteInterpreterProcess.referenceCount() > 0) { |
| remoteInterpreterProcess.dereference(); |
| } |
| remoteInterpreterProcess = null; |
| } |
| allInterpreterGroups.remove(id); |
| } |
| |
| /** |
| * Close all interpreter instances in this group for the session |
| * @param sessionId |
| */ |
| public void close(String sessionId) { |
| LOGGER.info("Close interpreter group " + getId() + " for session: " + sessionId); |
| final List<Interpreter> intpForSession = this.get(sessionId); |
| |
| close(intpForSession); |
| } |
| |
| private void close(final Collection<Interpreter> intpToClose) { |
| close(null, null, null, intpToClose); |
| } |
| |
| public void close(final Map<String, InterpreterGroup> interpreterGroupRef, |
| final String processKey, final String sessionKey) { |
| LOGGER.info("Close interpreter group " + getId() + " for session: " + sessionKey); |
| close(interpreterGroupRef, processKey, sessionKey, this.get(sessionKey)); |
| } |
| |
| private void close(final Map<String, InterpreterGroup> interpreterGroupRef, |
| final String processKey, final String sessionKey, final Collection<Interpreter> intpToClose) { |
| if (intpToClose == null) { |
| return; |
| } |
| Thread t = new Thread() { |
| public void run() { |
| for (Interpreter interpreter : intpToClose) { |
| Scheduler scheduler = interpreter.getScheduler(); |
| interpreter.close(); |
| |
| if (null != scheduler) { |
| SchedulerFactory.singleton().removeScheduler(scheduler.getName()); |
| } |
| } |
| |
| if (remoteInterpreterProcess != null) { |
| //TODO(jl): Because interpreter.close() runs as a seprate thread, we cannot guarantee |
| // refernceCount is a proper value. And as the same reason, we must not call |
| // remoteInterpreterProcess.dereference twice - this method also be called by |
| // interpreter.close(). |
| |
| // remoteInterpreterProcess.dereference(); |
| if (remoteInterpreterProcess.referenceCount() <= 0) { |
| remoteInterpreterProcess = null; |
| allInterpreterGroups.remove(id); |
| } |
| } |
| |
| // TODO(jl): While closing interpreters in a same session, we should remove after all |
| // interpreters are removed. OMG. It's too dirty!! |
| if (null != interpreterGroupRef && null != processKey && null != sessionKey) { |
| InterpreterGroup interpreterGroup = interpreterGroupRef.get(processKey); |
| if (1 == interpreterGroup.size() && interpreterGroup.containsKey(sessionKey)) { |
| interpreterGroupRef.remove(processKey); |
| } else { |
| interpreterGroup.remove(sessionKey); |
| } |
| } |
| } |
| }; |
| |
| t.start(); |
| try { |
| t.join(); |
| } catch (InterruptedException e) { |
| LOGGER.error("Can't close interpreter: {}", getId(), e); |
| } |
| |
| |
| } |
| |
| /** |
| * Close all interpreter instances in this group |
| */ |
| public void shutdown() { |
| LOGGER.info("Close interpreter group " + getId()); |
| |
| // make sure remote interpreter process terminates |
| if (remoteInterpreterProcess != null) { |
| while (remoteInterpreterProcess.referenceCount() > 0) { |
| remoteInterpreterProcess.dereference(); |
| } |
| remoteInterpreterProcess = null; |
| } |
| allInterpreterGroups.remove(id); |
| |
| List<Interpreter> intpToClose = new LinkedList<>(); |
| for (List<Interpreter> intpGroupForSession : this.values()) { |
| intpToClose.addAll(intpGroupForSession); |
| } |
| close(intpToClose); |
| } |
| |
| public void setResourcePool(ResourcePool resourcePool) { |
| this.resourcePool = resourcePool; |
| } |
| |
| public ResourcePool getResourcePool() { |
| return resourcePool; |
| } |
| |
| public boolean isAngularRegistryPushed() { |
| return angularRegistryPushed; |
| } |
| |
| public void setAngularRegistryPushed(boolean angularRegistryPushed) { |
| this.angularRegistryPushed = angularRegistryPushed; |
| } |
| } |