blob: d9bfd68093240574958e8f7d1bc3e27032a55cb0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.tinkerpop.gremlin.spark.structure;
import org.apache.commons.configuration.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.rdd.RDD;
import scala.collection.JavaConversions;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* This is a static cache the prevents Spark from garbage collecting unreferenced RDDs.
*
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public class Spark {
private static SparkContext CONTEXT;
private static final Map<String, RDD<?>> NAME_TO_RDD = new ConcurrentHashMap<>();
private Spark() {
}
public static SparkContext create(final SparkConf sparkConf) {
if (isContextNullOrStopped()) {
sparkConf.setAppName("Apache TinkerPop's Spark-Gremlin");
CONTEXT = SparkContext.getOrCreate(sparkConf);
}
return CONTEXT;
}
public static SparkContext create(org.apache.hadoop.conf.Configuration hadoopConfiguration) {
final SparkConf sparkConfiguration = new SparkConf();
hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
return Spark.create(sparkConfiguration);
}
public static SparkContext create(final Configuration configuration) {
final SparkConf sparkConf = new SparkConf();
configuration.getKeys().forEachRemaining(key -> sparkConf.set(key, configuration.getProperty(key).toString()));
return Spark.create(sparkConf);
}
public static SparkContext create(final String master) {
final SparkConf sparkConf = new SparkConf();
sparkConf.setMaster(master);
return Spark.create(sparkConf);
}
public static SparkContext create(final SparkContext sparkContext) {
if (null != CONTEXT && !CONTEXT.isStopped() && sparkContext !=CONTEXT /*exact the same object => NOP*/
&& !sparkContext.getConf().getBoolean("spark.driver.allowMultipleContexts", false)) {
throw new IllegalStateException(
"Active Spark context exists. Call Spark.close() to close it before creating a new one");
}
CONTEXT = sparkContext;
return CONTEXT;
}
public static SparkContext recreateStopped() {
if (null == CONTEXT)
throw new IllegalStateException("The Spark context has not been created.");
if (!CONTEXT.isStopped())
throw new IllegalStateException("The Spark context is not stopped.");
CONTEXT = SparkContext.getOrCreate(CONTEXT.getConf());
return CONTEXT;
}
public static SparkContext getContext() {
if (null != CONTEXT && CONTEXT.isStopped())
recreateStopped();
return CONTEXT;
}
public static void refresh() {
if (null == CONTEXT)
throw new IllegalStateException("The Spark context has not been created.");
if (CONTEXT.isStopped())
recreateStopped();
final Set<String> keepNames = new HashSet<>();
for (final RDD<?> rdd : JavaConversions.asJavaIterable(CONTEXT.persistentRdds().values())) {
if (null != rdd.name()) {
keepNames.add(rdd.name());
NAME_TO_RDD.put(rdd.name(), rdd);
}
}
// remove all stale names in the NAME_TO_RDD map
NAME_TO_RDD.keySet().stream().filter(key -> !keepNames.contains(key)).collect(Collectors.toList()).forEach(NAME_TO_RDD::remove);
}
public static RDD<?> getRDD(final String name) {
Spark.refresh();
if (!NAME_TO_RDD.containsKey(name))
throw new IllegalArgumentException("The named RDD does not exist in the Spark context: " + name);
return NAME_TO_RDD.get(name);
}
public static boolean hasRDD(final String name) {
Spark.refresh();
return NAME_TO_RDD.containsKey(name);
}
public static List<RDD<?>> getRDDs() {
Spark.refresh();
return new ArrayList<>(NAME_TO_RDD.values());
}
public static void removeRDD(final String name) {
if (null == name)
return;
Spark.refresh();
final RDD<?> rdd = NAME_TO_RDD.remove(name);
if (null != rdd)
rdd.unpersist(true);
}
public static void close() {
NAME_TO_RDD.clear();
if (null != CONTEXT) {
CONTEXT.stop();
// Allow Spark / Hadoop close and finalize() the input streams to its intrenal files.
// This is a well known problem reported in https://issues.apache.org/jira/browse/SPARK-12216
System.gc();
}
CONTEXT = null;
}
public static boolean isContextNullOrStopped() {
return null == CONTEXT || CONTEXT.isStopped();
}
}