| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.zeppelin.interpreter; |
| |
| import com.google.common.base.Joiner; |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileNotFoundException; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InputStreamReader; |
| import java.io.OutputStreamWriter; |
| import java.lang.reflect.Constructor; |
| import java.lang.reflect.InvocationTargetException; |
| import java.lang.reflect.Type; |
| import java.net.MalformedURLException; |
| import java.net.URL; |
| import java.net.URLClassLoader; |
| import java.nio.file.DirectoryStream; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.nio.file.Paths; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.nio.file.attribute.PosixFilePermission; |
| import java.util.Enumeration; |
| |
| import com.google.common.base.Preconditions; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.gson.Gson; |
| import com.google.gson.GsonBuilder; |
| import com.google.gson.internal.StringMap; |
| import com.google.gson.reflect.TypeToken; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.commons.lang.ArrayUtils; |
| import org.apache.commons.lang.NullArgumentException; |
| import org.apache.commons.lang.StringUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.sonatype.aether.RepositoryException; |
| import org.sonatype.aether.repository.Authentication; |
| import org.sonatype.aether.repository.Proxy; |
| import org.sonatype.aether.repository.RemoteRepository; |
| |
| import org.apache.zeppelin.conf.ZeppelinConfiguration; |
| import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; |
| import org.apache.zeppelin.dep.Dependency; |
| import org.apache.zeppelin.dep.DependencyResolver; |
| import org.apache.zeppelin.display.AngularObjectRegistry; |
| import org.apache.zeppelin.display.AngularObjectRegistryListener; |
| import org.apache.zeppelin.helium.ApplicationEventListener; |
| import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; |
| import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; |
| import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; |
| import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; |
| import org.apache.zeppelin.scheduler.Job; |
| import org.apache.zeppelin.scheduler.Job.Status; |
| |
| /** |
| * Manage interpreters. |
| */ |
| public class InterpreterFactory implements InterpreterGroupFactory { |
| private static final Logger logger = LoggerFactory.getLogger(InterpreterFactory.class); |
| |
| private Map<String, URLClassLoader> cleanCl = |
| Collections.synchronizedMap(new HashMap<String, URLClassLoader>()); |
| |
| private ZeppelinConfiguration conf; |
| |
| private final InterpreterSettingManager interpreterSettingManager; |
| |
| private Gson gson; |
| |
| private AngularObjectRegistryListener angularObjectRegistryListener; |
| private final RemoteInterpreterProcessListener remoteInterpreterProcessListener; |
| private final ApplicationEventListener appEventListener; |
| |
| private boolean shiroEnabled; |
| |
| private Map<String, String> env = new HashMap<>(); |
| |
| private Interpreter devInterpreter; |
| |
| public InterpreterFactory(ZeppelinConfiguration conf, |
| AngularObjectRegistryListener angularObjectRegistryListener, |
| RemoteInterpreterProcessListener remoteInterpreterProcessListener, |
| ApplicationEventListener appEventListener, DependencyResolver depResolver, |
| boolean shiroEnabled, InterpreterSettingManager interpreterSettingManager) |
| throws InterpreterException, IOException, RepositoryException { |
| this.conf = conf; |
| this.angularObjectRegistryListener = angularObjectRegistryListener; |
| this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; |
| this.appEventListener = appEventListener; |
| this.shiroEnabled = shiroEnabled; |
| |
| GsonBuilder builder = new GsonBuilder(); |
| builder.setPrettyPrinting(); |
| gson = builder.create(); |
| |
| this.interpreterSettingManager = interpreterSettingManager; |
| //TODO(jl): Fix it not to use InterpreterGroupFactory |
| interpreterSettingManager.setInterpreterGroupFactory(this); |
| |
| logger.info("shiroEnabled: {}", shiroEnabled); |
| } |
| |
| /** |
| * @param id interpreterGroup id. Combination of interpreterSettingId + noteId/userId/shared |
| * depends on interpreter mode |
| */ |
| @Override |
| public InterpreterGroup createInterpreterGroup(String id, InterpreterOption option) |
| throws InterpreterException, NullArgumentException { |
| |
| //When called from REST API without option we receive NPE |
| if (option == null) { |
| throw new NullArgumentException("option"); |
| } |
| |
| AngularObjectRegistry angularObjectRegistry; |
| |
| InterpreterGroup interpreterGroup = new InterpreterGroup(id); |
| if (option.isRemote()) { |
| angularObjectRegistry = |
| new RemoteAngularObjectRegistry(id, angularObjectRegistryListener, interpreterGroup); |
| } else { |
| angularObjectRegistry = new AngularObjectRegistry(id, angularObjectRegistryListener); |
| |
| // TODO(moon) : create distributed resource pool for local interpreters and set |
| } |
| |
| interpreterGroup.setAngularObjectRegistry(angularObjectRegistry); |
| return interpreterGroup; |
| } |
| |
| public void createInterpretersForNote(InterpreterSetting interpreterSetting, String user, |
| String noteId, String interpreterSessionKey) { |
| InterpreterGroup interpreterGroup = interpreterSetting.getInterpreterGroup(user, noteId); |
| InterpreterOption option = interpreterSetting.getOption(); |
| Properties properties = (Properties) interpreterSetting.getProperties(); |
| // if interpreters are already there, wait until they're being removed |
| synchronized (interpreterGroup) { |
| long interpreterRemovalWaitStart = System.nanoTime(); |
| // interpreter process supposed to be terminated by RemoteInterpreterProcess.dereference() |
| // in ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT msec. However, if termination of the process and |
| // removal from interpreter group take too long, throw an error. |
| long minTimeout = 10L * 1000 * 1000000; // 10 sec |
| long interpreterRemovalWaitTimeout = Math.max(minTimeout, |
| conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT) * 1000000L * 2); |
| while (interpreterGroup.containsKey(interpreterSessionKey)) { |
| if (System.nanoTime() - interpreterRemovalWaitStart > interpreterRemovalWaitTimeout) { |
| throw new InterpreterException("Can not create interpreter"); |
| } |
| try { |
| interpreterGroup.wait(1000); |
| } catch (InterruptedException e) { |
| logger.debug(e.getMessage(), e); |
| } |
| } |
| } |
| |
| logger.info("Create interpreter instance {} for note {}", interpreterSetting.getName(), noteId); |
| |
| List<InterpreterInfo> interpreterInfos = interpreterSetting.getInterpreterInfos(); |
| String path = interpreterSetting.getPath(); |
| InterpreterRunner runner = interpreterSetting.getInterpreterRunner(); |
| Interpreter interpreter; |
| for (InterpreterInfo info : interpreterInfos) { |
| if (option.isRemote()) { |
| if (option.isExistingProcess()) { |
| interpreter = |
| connectToRemoteRepl(interpreterSessionKey, info.getClassName(), option.getHost(), |
| option.getPort(), properties, interpreterSetting.getId(), user, |
| option.isUserImpersonate); |
| } else { |
| interpreter = createRemoteRepl(path, interpreterSessionKey, info.getClassName(), |
| properties, interpreterSetting.getId(), user, option.isUserImpersonate(), runner); |
| } |
| } else { |
| interpreter = createRepl(interpreterSetting.getPath(), info.getClassName(), properties); |
| } |
| |
| synchronized (interpreterGroup) { |
| List<Interpreter> interpreters = interpreterGroup.get(interpreterSessionKey); |
| if (null == interpreters) { |
| interpreters = new ArrayList<>(); |
| interpreterGroup.put(interpreterSessionKey, interpreters); |
| } |
| if (info.isDefaultInterpreter()) { |
| interpreters.add(0, interpreter); |
| } else { |
| interpreters.add(interpreter); |
| } |
| } |
| logger.info("Interpreter {} {} created", interpreter.getClassName(), interpreter.hashCode()); |
| interpreter.setInterpreterGroup(interpreterGroup); |
| } |
| } |
| |
| private Interpreter createRepl(String dirName, String className, Properties property) |
| throws InterpreterException { |
| logger.info("Create repl {} from {}", className, dirName); |
| |
| ClassLoader oldcl = Thread.currentThread().getContextClassLoader(); |
| try { |
| |
| URLClassLoader ccl = cleanCl.get(dirName); |
| if (ccl == null) { |
| // classloader fallback |
| ccl = URLClassLoader.newInstance(new URL[]{}, oldcl); |
| } |
| |
| boolean separateCL = true; |
| try { // check if server's classloader has driver already. |
| Class cls = this.getClass().forName(className); |
| if (cls != null) { |
| separateCL = false; |
| } |
| } catch (Exception e) { |
| logger.error("exception checking server classloader driver", e); |
| } |
| |
| URLClassLoader cl; |
| |
| if (separateCL == true) { |
| cl = URLClassLoader.newInstance(new URL[]{}, ccl); |
| } else { |
| cl = ccl; |
| } |
| Thread.currentThread().setContextClassLoader(cl); |
| |
| Class<Interpreter> replClass = (Class<Interpreter>) cl.loadClass(className); |
| Constructor<Interpreter> constructor = |
| replClass.getConstructor(new Class[]{Properties.class}); |
| Interpreter repl = constructor.newInstance(property); |
| repl.setClassloaderUrls(ccl.getURLs()); |
| LazyOpenInterpreter intp = new LazyOpenInterpreter(new ClassloaderInterpreter(repl, cl)); |
| return intp; |
| } catch (SecurityException e) { |
| throw new InterpreterException(e); |
| } catch (NoSuchMethodException e) { |
| throw new InterpreterException(e); |
| } catch (IllegalArgumentException e) { |
| throw new InterpreterException(e); |
| } catch (InstantiationException e) { |
| throw new InterpreterException(e); |
| } catch (IllegalAccessException e) { |
| throw new InterpreterException(e); |
| } catch (InvocationTargetException e) { |
| throw new InterpreterException(e); |
| } catch (ClassNotFoundException e) { |
| throw new InterpreterException(e); |
| } finally { |
| Thread.currentThread().setContextClassLoader(oldcl); |
| } |
| } |
| |
| private Interpreter connectToRemoteRepl(String interpreterSessionKey, String className, |
| String host, int port, Properties property, String interpreterSettingId, String userName, |
| Boolean isUserImpersonate) { |
| int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); |
| int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE); |
| String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterSettingId; |
| LazyOpenInterpreter intp = new LazyOpenInterpreter( |
| new RemoteInterpreter(property, interpreterSessionKey, className, host, port, localRepoPath, |
| connectTimeout, maxPoolSize, remoteInterpreterProcessListener, appEventListener, |
| userName, isUserImpersonate, conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT))); |
| return intp; |
| } |
| |
| Interpreter createRemoteRepl(String interpreterPath, String interpreterSessionKey, |
| String className, Properties property, String interpreterSettingId, |
| String userName, Boolean isUserImpersonate, InterpreterRunner interpreterRunner) { |
| int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); |
| String localRepoPath = conf.getInterpreterLocalRepoPath() + "/" + interpreterSettingId; |
| int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE); |
| String interpreterRunnerPath; |
| if (null != interpreterRunner) { |
| interpreterRunnerPath = interpreterRunner.getPath(); |
| Path p = Paths.get(interpreterRunnerPath); |
| if (!p.isAbsolute()) { |
| interpreterRunnerPath = Joiner.on(File.separator) |
| .join(interpreterPath, interpreterRunnerPath); |
| } |
| } else { |
| interpreterRunnerPath = conf.getInterpreterRemoteRunnerPath(); |
| } |
| |
| RemoteInterpreter remoteInterpreter = |
| new RemoteInterpreter(property, interpreterSessionKey, className, |
| interpreterRunnerPath, interpreterPath, localRepoPath, connectTimeout, maxPoolSize, |
| remoteInterpreterProcessListener, appEventListener, userName, isUserImpersonate, |
| conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT), |
| conf.getInterpreterPortRange()); |
| remoteInterpreter.addEnv(env); |
| |
| return new LazyOpenInterpreter(remoteInterpreter); |
| } |
| |
| private List<Interpreter> createOrGetInterpreterList(String user, String noteId, |
| InterpreterSetting setting) { |
| InterpreterGroup interpreterGroup = setting.getInterpreterGroup(user, noteId); |
| synchronized (interpreterGroup) { |
| String interpreterSessionKey = |
| interpreterSettingManager.getInterpreterSessionKey(user, noteId, setting); |
| if (!interpreterGroup.containsKey(interpreterSessionKey)) { |
| createInterpretersForNote(setting, user, noteId, interpreterSessionKey); |
| } |
| return interpreterGroup.get(interpreterSessionKey); |
| } |
| } |
| |
| private InterpreterSetting getInterpreterSettingByGroup(List<InterpreterSetting> settings, |
| String group) { |
| Preconditions.checkNotNull(group, "group should be not null"); |
| |
| for (InterpreterSetting setting : settings) { |
| if (group.equals(setting.getName())) { |
| return setting; |
| } |
| } |
| return null; |
| } |
| |
| private String getInterpreterClassFromInterpreterSetting(InterpreterSetting setting, |
| String name) { |
| Preconditions.checkNotNull(name, "name should be not null"); |
| |
| for (InterpreterInfo info : setting.getInterpreterInfos()) { |
| String infoName = info.getName(); |
| if (null != info.getName() && name.equals(infoName)) { |
| return info.getClassName(); |
| } |
| } |
| return null; |
| } |
| |
| private Interpreter getInterpreter(String user, String noteId, InterpreterSetting setting, |
| String name) { |
| Preconditions.checkNotNull(noteId, "noteId should be not null"); |
| Preconditions.checkNotNull(setting, "setting should be not null"); |
| Preconditions.checkNotNull(name, "name should be not null"); |
| |
| String className; |
| if (null != (className = getInterpreterClassFromInterpreterSetting(setting, name))) { |
| List<Interpreter> interpreterGroup = createOrGetInterpreterList(user, noteId, setting); |
| for (Interpreter interpreter : interpreterGroup) { |
| if (className.equals(interpreter.getClassName())) { |
| return interpreter; |
| } |
| } |
| } |
| return null; |
| } |
| |
| public Interpreter getInterpreter(String user, String noteId, String replName) { |
| List<InterpreterSetting> settings = interpreterSettingManager.getInterpreterSettings(noteId); |
| InterpreterSetting setting; |
| Interpreter interpreter; |
| |
| if (settings == null || settings.size() == 0) { |
| return null; |
| } |
| |
| if (replName == null || replName.trim().length() == 0) { |
| // get default settings (first available) |
| // TODO(jl): Fix it in case of returning null |
| InterpreterSetting defaultSettings = interpreterSettingManager |
| .getDefaultInterpreterSetting(settings); |
| return createOrGetInterpreterList(user, noteId, defaultSettings).get(0); |
| } |
| |
| String[] replNameSplit = replName.split("\\."); |
| if (replNameSplit.length == 2) { |
| String group = null; |
| String name = null; |
| group = replNameSplit[0]; |
| name = replNameSplit[1]; |
| |
| setting = getInterpreterSettingByGroup(settings, group); |
| |
| if (null != setting) { |
| interpreter = getInterpreter(user, noteId, setting, name); |
| |
| if (null != interpreter) { |
| return interpreter; |
| } |
| } |
| |
| throw new InterpreterException(replName + " interpreter not found"); |
| |
| } else { |
| // first assume replName is 'name' of interpreter. ('groupName' is ommitted) |
| // search 'name' from first (default) interpreter group |
| // TODO(jl): Handle with noteId to support defaultInterpreter per note. |
| setting = interpreterSettingManager.getDefaultInterpreterSetting(settings); |
| |
| interpreter = getInterpreter(user, noteId, setting, replName); |
| |
| if (null != interpreter) { |
| return interpreter; |
| } |
| |
| // next, assume replName is 'group' of interpreter ('name' is ommitted) |
| // search interpreter group and return first interpreter. |
| setting = getInterpreterSettingByGroup(settings, replName); |
| |
| if (null != setting) { |
| List<Interpreter> interpreters = createOrGetInterpreterList(user, noteId, setting); |
| if (null != interpreters) { |
| return interpreters.get(0); |
| } |
| } |
| |
| // Support the legacy way to use it |
| for (InterpreterSetting s : settings) { |
| if (s.getGroup().equals(replName)) { |
| List<Interpreter> interpreters = createOrGetInterpreterList(user, noteId, s); |
| if (null != interpreters) { |
| return interpreters.get(0); |
| } |
| } |
| } |
| } |
| |
| return null; |
| } |
| |
| public Map<String, String> getEnv() { |
| return env; |
| } |
| |
| public void setEnv(Map<String, String> env) { |
| this.env = env; |
| } |
| |
| |
| } |