blob: 51aeab7b98d98928a6f62d413c3813437d0c314b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.interpreter;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.scheduler.Job;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
/**
* ManagedInterpreterGroup runs under zeppelin server
*/
public class ManagedInterpreterGroup extends InterpreterGroup {
private static final Logger LOGGER = LoggerFactory.getLogger(ManagedInterpreterGroup.class);
private InterpreterSetting interpreterSetting;
private RemoteInterpreterProcess remoteInterpreterProcess; // attached remote interpreter process
private Object interpreterProcessCreationLock = new Object();
/**
* Create InterpreterGroup with given id and interpreterSetting, used in ZeppelinServer
* @param id
* @param interpreterSetting
*/
ManagedInterpreterGroup(String id, InterpreterSetting interpreterSetting) {
super(id);
this.interpreterSetting = interpreterSetting;
}
public InterpreterSetting getInterpreterSetting() {
return interpreterSetting;
}
public RemoteInterpreterProcess getOrCreateInterpreterProcess(String userName,
Properties properties)
throws IOException {
synchronized (interpreterProcessCreationLock) {
if (remoteInterpreterProcess == null) {
LOGGER.info("Create InterpreterProcess for InterpreterGroup: {}", getId());
remoteInterpreterProcess = interpreterSetting.createInterpreterProcess(id, userName,
properties);
remoteInterpreterProcess.start(userName);
remoteInterpreterProcess.init(ZeppelinConfiguration.create());
getInterpreterSetting().getRecoveryStorage()
.onInterpreterClientStart(remoteInterpreterProcess);
}
return remoteInterpreterProcess;
}
}
public RemoteInterpreterProcess getInterpreterProcess() {
return remoteInterpreterProcess;
}
public RemoteInterpreterProcess getRemoteInterpreterProcess() {
return remoteInterpreterProcess;
}
/**
* Close all interpreter instances in this group
*/
@Override
public void close() {
LOGGER.info("Close InterpreterGroup: {}", id);
for (String sessionId : sessions.keySet()) {
close(sessionId);
}
}
/**
* Close all interpreter instances in this session
* @param sessionId
*/
public synchronized void close(String sessionId) {
LOGGER.info("Close Session: {} for interpreter setting: {}", sessionId, interpreterSetting.getName());
close(sessions.remove(sessionId));
//TODO(zjffdu) whether close InterpreterGroup if there's no session left in Zeppelin Server
if (sessions.isEmpty() && interpreterSetting != null) {
LOGGER.info("Remove this InterpreterGroup: {} as all the sessions are closed", id);
interpreterSetting.removeInterpreterGroup(id);
if (remoteInterpreterProcess != null) {
LOGGER.info("Kill RemoteInterpreterProcess");
remoteInterpreterProcess.stop();
try {
interpreterSetting.getRecoveryStorage().onInterpreterClientStop(remoteInterpreterProcess);
} catch (IOException e) {
LOGGER.error("Fail to store recovery data", e);
}
remoteInterpreterProcess = null;
}
}
}
private void close(Collection<Interpreter> interpreters) {
if (interpreters == null) {
return;
}
List<Thread> closeThreads = interpreters.stream()
.map(interpreter -> new Thread(() ->
closeInterpreter(interpreter),
interpreter.getClass().getSimpleName() + "-close"))
.peek(t -> t.setUncaughtExceptionHandler((th, e) ->
LOGGER.error("Interpreter close error", e)))
.peek(Thread::start)
.collect(Collectors.toList());
for (Thread t : closeThreads) {
try {
t.join();
} catch (InterruptedException e) {
LOGGER.error("Can't wait interpreter close threads", e);
Thread.currentThread().interrupt();
break;
}
}
}
private void closeInterpreter(Interpreter interpreter) {
Scheduler scheduler = interpreter.getScheduler();
try {
if (Boolean.parseBoolean(
interpreter.getProperty("zeppelin.interpreter.close.cancel_job", "true"))) {
for (final Job job : scheduler.getAllJobs()) {
if (!job.isTerminated()) {
job.abort();
job.setStatus(Job.Status.ABORT);
LOGGER.info("Job {} aborted ", job.getJobName());
}
}
} else {
LOGGER.info("Keep job running while closing interpreter: {}", interpreter.getClassName());
}
LOGGER.info("Trying to close interpreter {}", interpreter.getClassName());
interpreter.close();
} catch (InterpreterException e) {
LOGGER.warn("Fail to close interpreter {}", interpreter.getClassName(), e);
} finally {
//TODO(zjffdu) move the close of schedule to Interpreter
SchedulerFactory.singleton().removeScheduler(scheduler.getName());
}
}
public synchronized List<Interpreter> getOrCreateSession(String user, String sessionId) {
if (sessions.containsKey(sessionId)) {
return sessions.get(sessionId);
} else {
List<Interpreter> interpreters = interpreterSetting.createInterpreters(user, id, sessionId);
for (Interpreter interpreter : interpreters) {
interpreter.setInterpreterGroup(this);
}
LOGGER.info("Create Session: {} in InterpreterGroup: {} for user: {}", sessionId, id, user);
sessions.put(sessionId, interpreters);
return interpreters;
}
}
public boolean isEmpty() {
return this.sessions.isEmpty();
}
}