blob: c64fc72c9cbe320005f954ae2d5997f93cea1077 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.livy.rsc.driver;
import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.hive.HiveContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SparkEntries {
private static final Logger LOG = LoggerFactory.getLogger(SparkEntries.class);
private volatile JavaSparkContext sc;
private final SparkConf conf;
private volatile SQLContext sqlctx;
private volatile HiveContext hivectx;
private volatile SparkSession sparksession;
public SparkEntries(SparkConf conf) {
this.conf = conf;
}
public JavaSparkContext sc() {
if (sc == null) {
synchronized (this) {
if (sc == null) {
long t1 = System.nanoTime();
LOG.info("Starting Spark context...");
SparkContext scalaSc = SparkContext.getOrCreate(conf);
sc = new JavaSparkContext(scalaSc);
LOG.info("Spark context finished initialization in {}ms",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t1));
}
}
}
return sc;
}
public SparkSession sparkSession() throws Exception {
if (sparksession == null) {
synchronized (this) {
if (sparksession == null) {
SparkSession.Builder builder = SparkSession.builder().sparkContext(sc().sc());
try {
SparkConf conf = sc().getConf();
String catalog = conf.get("spark.sql.catalogImplementation", "in-memory").toLowerCase();
if (catalog.equals("hive") && SparkSession$.MODULE$.hiveClassesArePresent()) {
ClassLoader loader = Thread.currentThread().getContextClassLoader() != null ?
Thread.currentThread().getContextClassLoader() : getClass().getClassLoader();
if (loader.getResource("hive-site.xml") == null) {
LOG.warn("livy.repl.enable-hive-context is true but no hive-site.xml found on " +
"classpath");
}
builder.enableHiveSupport();
sparksession = builder.getOrCreate();
LOG.info("Created Spark session (with Hive support).");
} else {
builder.config("spark.sql.catalogImplementation", "in-memory");
sparksession = builder.getOrCreate();
LOG.info("Created Spark session.");
}
} catch (Exception e) {
LOG.warn("SparkSession is not supported", e);
throw e;
}
}
}
}
return sparksession;
}
public SQLContext sqlctx() {
if (sqlctx == null) {
synchronized (this) {
if (sqlctx == null) {
sqlctx = new SQLContext(sc());
LOG.info("Created SQLContext.");
}
}
}
return sqlctx;
}
public HiveContext hivectx() {
if (hivectx == null) {
synchronized (this) {
if (hivectx == null) {
SparkConf conf = sc.getConf();
if (conf.getBoolean("spark.repl.enableHiveContext", false) ||
conf.get("spark.sql.catalogImplementation", "in-memory").toLowerCase()
.equals("hive")) {
ClassLoader loader = Thread.currentThread().getContextClassLoader() != null ?
Thread.currentThread().getContextClassLoader() : getClass().getClassLoader();
if (loader.getResource("hive-site.xml") == null) {
LOG.warn("livy.repl.enable-hive-context is true but no hive-site.xml found on " +
"classpath.");
}
hivectx = new HiveContext(sc().sc());
LOG.info("Created HiveContext.");
}
}
}
}
return hivectx;
}
public synchronized void stop() {
if (sc != null) {
sc.stop();
}
}
}