blob: 7eb1eef84470b24664ff441f339e6f098a520446 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.spark;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.zeppelin.interpreter.BaseZeppelinContext;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.r.RInterpreter;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
* R and SparkR interpreter with visualization support.
*/
public class SparkRInterpreter extends RInterpreter {
private static final Logger LOGGER = LoggerFactory.getLogger(SparkRInterpreter.class);
private SparkInterpreter sparkInterpreter;
private SparkVersion sparkVersion;
private boolean isSpark2;
private SparkContext sc;
private JavaSparkContext jsc;
public SparkRInterpreter(Properties property) {
super(property);
}
@Override
protected boolean isSparkSupported() {
return true;
}
@Override
protected boolean isSecretSupported() {
return sparkVersion.isSecretSocketSupported();
}
@Override
protected int sparkVersion() {
return new SparkVersion(sc.version()).toNumber();
}
@Override
public void open() throws InterpreterException {
this.sparkInterpreter = getInterpreterInTheSameSessionByClassName(SparkInterpreter.class);
this.sc = sparkInterpreter.getSparkContext();
this.jsc = sparkInterpreter.getJavaSparkContext();
this.sparkVersion = new SparkVersion(sc.version());
this.isSpark2 = sparkVersion.newerThanEquals(SparkVersion.SPARK_2_0_0);
ZeppelinRContext.setSparkContext(sc);
ZeppelinRContext.setJavaSparkContext(jsc);
if (isSpark2) {
ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession());
}
ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext());
ZeppelinRContext.setZeppelinContext(sparkInterpreter.getZeppelinContext());
super.open();
}
@Override
public InterpreterResult internalInterpret(String lines, InterpreterContext interpreterContext)
throws InterpreterException {
Utils.printDeprecateMessage(sparkInterpreter.getSparkVersion(),
interpreterContext, properties);
String jobGroup = Utils.buildJobGroupId(interpreterContext);
String jobDesc = Utils.buildJobDesc(interpreterContext);
sparkInterpreter.getSparkContext().setJobGroup(jobGroup, jobDesc, false);
String setJobGroup = "";
// assign setJobGroup to dummy__, otherwise it would print NULL for this statement
if (isSpark2) {
setJobGroup = "dummy__ <- setJobGroup(\"" + jobGroup +
"\", \" +" + jobDesc + "\", TRUE)";
} else {
setJobGroup = "dummy__ <- setJobGroup(sc, \"" + jobGroup +
"\", \"" + jobDesc + "\", TRUE)";
}
lines = setJobGroup + "\n" + lines;
if (sparkInterpreter.getSparkVersion().newerThanEquals(SparkVersion.SPARK_2_3_0)) {
// setLocalProperty is only available from spark 2.3.0
String setPoolStmt = "setLocalProperty('spark.scheduler.pool', NULL)";
if (interpreterContext.getLocalProperties().containsKey("pool")) {
setPoolStmt = "setLocalProperty('spark.scheduler.pool', '" +
interpreterContext.getLocalProperties().get("pool") + "')";
}
lines = setPoolStmt + "\n" + lines;
}
return super.internalInterpret(lines, interpreterContext);
}
@Override
public void close() throws InterpreterException {
super.close();
if (this.sparkInterpreter != null) {
this.sparkInterpreter.close();
this.sparkInterpreter = null;
}
}
@Override
public void cancel(InterpreterContext context) {
if (this.sc != null) {
sc.cancelJobGroup(Utils.buildJobGroupId(context));
}
}
@Override
public FormType getFormType() {
return FormType.NATIVE;
}
@Override
public int getProgress(InterpreterContext context) throws InterpreterException {
if (sparkInterpreter != null) {
return sparkInterpreter.getProgress(context);
} else {
return 0;
}
}
@Override
public Scheduler getScheduler() {
return SchedulerFactory.singleton().createOrGetFIFOScheduler(
SparkRInterpreter.class.getName() + this.hashCode());
}
@Override
public BaseZeppelinContext getZeppelinContext() {
return sparkInterpreter.getZeppelinContext();
}
@Override
public List<InterpreterCompletion> completion(String buf, int cursor,
InterpreterContext interpreterContext) {
return new ArrayList<>();
}
}