blob: fb268d4276a4327f3a88f0b741ca239644bcd803 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysml.runtime.controlprogram.context;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.storage.RDDInfo;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.util.LongAccumulator;
import org.apache.sysml.api.DMLScript;
import org.apache.sysml.api.DMLScript.RUNTIME_PLATFORM;
import org.apache.sysml.api.mlcontext.MLContext;
import org.apache.sysml.api.mlcontext.MLContextUtil;
import org.apache.sysml.conf.ConfigurationManager;
import org.apache.sysml.hops.OptimizerUtils;
import org.apache.sysml.lops.Checkpoint;
import org.apache.sysml.parser.Expression.ValueType;
import org.apache.sysml.runtime.DMLRuntimeException;
import org.apache.sysml.runtime.compress.CompressedMatrixBlock;
import org.apache.sysml.runtime.controlprogram.Program;
import org.apache.sysml.runtime.controlprogram.caching.CacheableData;
import org.apache.sysml.runtime.controlprogram.caching.FrameObject;
import org.apache.sysml.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysml.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysml.runtime.instructions.cp.Data;
import org.apache.sysml.runtime.instructions.spark.data.BroadcastObject;
import org.apache.sysml.runtime.instructions.spark.data.LineageObject;
import org.apache.sysml.runtime.instructions.spark.data.PartitionedBlock;
import org.apache.sysml.runtime.instructions.spark.data.PartitionedBroadcast;
import org.apache.sysml.runtime.instructions.spark.data.RDDObject;
import org.apache.sysml.runtime.instructions.spark.functions.ComputeBinaryBlockNnzFunction;
import org.apache.sysml.runtime.instructions.spark.functions.CopyBinaryCellFunction;
import org.apache.sysml.runtime.instructions.spark.functions.CopyFrameBlockPairFunction;
import org.apache.sysml.runtime.instructions.spark.functions.CopyTextInputFunction;
import org.apache.sysml.runtime.instructions.spark.functions.CreateSparseBlockFunction;
import org.apache.sysml.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction;
import org.apache.sysml.runtime.instructions.spark.utils.RDDAggregateUtils;
import org.apache.sysml.runtime.instructions.spark.utils.SparkUtils;
import org.apache.sysml.runtime.matrix.MatrixCharacteristics;
import org.apache.sysml.runtime.matrix.data.FrameBlock;
import org.apache.sysml.runtime.matrix.data.InputInfo;
import org.apache.sysml.runtime.matrix.data.MatrixBlock;
import org.apache.sysml.runtime.matrix.data.MatrixCell;
import org.apache.sysml.runtime.matrix.data.MatrixIndexes;
import org.apache.sysml.runtime.matrix.data.OutputInfo;
import org.apache.sysml.runtime.matrix.data.SparseBlock;
import org.apache.sysml.runtime.matrix.mapred.MRJobConfiguration;
import org.apache.sysml.runtime.util.MapReduceTool;
import org.apache.sysml.runtime.util.UtilFunctions;
import org.apache.sysml.utils.MLContextProxy;
import org.apache.sysml.utils.Statistics;
import scala.Tuple2;
public class SparkExecutionContext extends ExecutionContext
{
private static final Log LOG = LogFactory.getLog(SparkExecutionContext.class.getName());
private static final boolean LDEBUG = false; //local debug flag
//internal configurations
private static boolean LAZY_SPARKCTX_CREATION = true;
private static boolean ASYNCHRONOUS_VAR_DESTROY = true;
public static boolean FAIR_SCHEDULER_MODE = true;
//executor memory and relative fractions as obtained from the spark configuration
private static SparkClusterConfig _sconf = null;
//singleton spark context (as there can be only one spark context per JVM)
private static JavaSparkContext _spctx = null;
//registry of parallelized RDDs to enforce that at any time, we spent at most
//10% of JVM max heap size for parallelized RDDs; if this is not sufficient,
//matrices or frames are exported to HDFS and the RDDs are created from files.
//TODO unify memory management for CP, par RDDs, and potentially broadcasts
private static MemoryManagerParRDDs _parRDDs = new MemoryManagerParRDDs(0.1);
static {
// for internal debugging only
if( LDEBUG ) {
Logger.getLogger("org.apache.sysml.runtime.controlprogram.context")
.setLevel((Level) Level.DEBUG);
}
}
protected SparkExecutionContext(boolean allocateVars, Program prog)
{
//protected constructor to force use of ExecutionContextFactory
super( allocateVars, prog );
//spark context creation via internal initializer
if( !LAZY_SPARKCTX_CREATION || DMLScript.rtplatform==RUNTIME_PLATFORM.SPARK ) {
initSparkContext();
}
}
/**
* Returns the used singleton spark context. In case of lazy spark context
* creation, this methods blocks until the spark context is created.
*
* @return java spark context
*/
public JavaSparkContext getSparkContext()
{
//lazy spark context creation on demand (lazy instead of asynchronous
//to avoid wait for uninitialized spark context on close)
if( LAZY_SPARKCTX_CREATION ) {
initSparkContext();
}
//return the created spark context
return _spctx;
}
public static JavaSparkContext getSparkContextStatic() {
initSparkContext();
return _spctx;
}
/**
* Indicates if the spark context has been created or has
* been passed in from outside.
*
* @return true if spark context created
*/
public synchronized static boolean isSparkContextCreated() {
return (_spctx != null);
}
public static void resetSparkContextStatic() {
_spctx = null;
}
public void close()
{
synchronized( SparkExecutionContext.class ) {
if( _spctx != null )
{
//stop the spark context if existing
_spctx.stop();
//make sure stopped context is never used again
_spctx = null;
}
}
}
public static boolean isLazySparkContextCreation(){
return LAZY_SPARKCTX_CREATION;
}
private synchronized static void initSparkContext()
{
//check for redundant spark context init
if( _spctx != null )
return;
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
//create a default spark context (master, appname, etc refer to system properties
//as given in the spark configuration or during spark-submit)
MLContext mlCtxObj = MLContextProxy.getActiveMLContext();
if(mlCtxObj != null)
{
// This is when DML is called through spark shell
// Will clean the passing of static variables later as this involves minimal change to DMLScript
_spctx = MLContextUtil.getJavaSparkContext(mlCtxObj);
}
else
{
if(DMLScript.USE_LOCAL_SPARK_CONFIG) {
// For now set 4 cores for integration testing :)
SparkConf conf = createSystemMLSparkConf()
.setMaster("local[*]").setAppName("My local integration test app");
// This is discouraged in spark but have added only for those testcase that cannot stop the context properly
// conf.set("spark.driver.allowMultipleContexts", "true");
conf.set("spark.ui.enabled", "false");
_spctx = new JavaSparkContext(conf);
}
else //default cluster setup
{
//setup systemml-preferred spark configuration (w/o user choice)
SparkConf conf = createSystemMLSparkConf();
_spctx = new JavaSparkContext(conf);
}
_parRDDs.clear();
}
// Set warning if spark.driver.maxResultSize is not set. It needs to be set before starting Spark Context for CP collect
String strDriverMaxResSize = _spctx.getConf().get("spark.driver.maxResultSize", "1g");
long driverMaxResSize = UtilFunctions.parseMemorySize(strDriverMaxResSize);
if (driverMaxResSize != 0 && driverMaxResSize<OptimizerUtils.getLocalMemBudget() && !DMLScript.USE_LOCAL_SPARK_CONFIG)
LOG.warn("Configuration parameter spark.driver.maxResultSize set to " + UtilFunctions.formatMemorySize(driverMaxResSize) + "."
+ " You can set it through Spark default configuration setting either to 0 (unlimited) or to available memory budget of size "
+ UtilFunctions.formatMemorySize((long)OptimizerUtils.getLocalMemBudget()) + ".");
//globally add binaryblock serialization framework for all hdfs read/write operations
//TODO if spark context passed in from outside (mlcontext), we need to clean this up at the end
if( MRJobConfiguration.USE_BINARYBLOCK_SERIALIZATION )
MRJobConfiguration.addBinaryBlockSerializationFramework( _spctx.hadoopConfiguration() );
//statistics maintenance
if( DMLScript.STATISTICS ){
Statistics.setSparkCtxCreateTime(System.nanoTime()-t0);
}
}
/**
* Sets up a SystemML-preferred Spark configuration based on the implicit
* default configuration (as passed via configurations from outside).
*
* @return spark configuration
*/
public static SparkConf createSystemMLSparkConf() {
SparkConf conf = new SparkConf();
//always set unlimited result size (required for cp collect)
conf.set("spark.driver.maxResultSize", "0");
//always use the fair scheduler (for single jobs, it's equivalent to fifo
//but for concurrent jobs in parfor it ensures better data locality because
//round robin assignment mitigates the problem of 'sticky slots')
if( FAIR_SCHEDULER_MODE ) {
conf.set("spark.scheduler.mode", "FAIR");
}
//increase scheduler delay (usually more robust due to better data locality)
if( !conf.contains("spark.locality.wait") ) { //default 3s
conf.set("spark.locality.wait", "5s");
}
//increase max message size for robustness
String sparkVersion = org.apache.spark.package$.MODULE$.SPARK_VERSION();
String msgSizeConf = (UtilFunctions.compareVersion(sparkVersion, "2.0.0") < 0) ?
"spark.akka.frameSize" : "spark.rpc.message.maxSize";
if( !conf.contains(msgSizeConf) ) { //default 128MB
conf.set(msgSizeConf, "512");
}
return conf;
}
public static boolean isLocalMaster() {
return getSparkContextStatic().isLocal();
}
/**
* Spark instructions should call this for all matrix inputs except broadcast
* variables.
*
* @param varname variable name
* @return JavaPairRDD of MatrixIndexes-MatrixBlocks
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
@SuppressWarnings("unchecked")
public JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryBlockRDDHandleForVariable( String varname )
throws DMLRuntimeException
{
return (JavaPairRDD<MatrixIndexes,MatrixBlock>) getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo);
}
/**
* Spark instructions should call this for all frame inputs except broadcast
* variables.
*
* @param varname variable name
* @return JavaPairRDD of Longs-FrameBlocks
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
@SuppressWarnings("unchecked")
public JavaPairRDD<Long,FrameBlock> getFrameBinaryBlockRDDHandleForVariable( String varname )
throws DMLRuntimeException
{
JavaPairRDD<Long,FrameBlock> out = (JavaPairRDD<Long,FrameBlock>) getRDDHandleForVariable( varname, InputInfo.BinaryBlockInputInfo);
return out;
}
public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, InputInfo inputInfo )
throws DMLRuntimeException
{
Data dat = getVariable(varname);
if( dat instanceof MatrixObject ) {
MatrixObject mo = getMatrixObject(varname);
return getRDDHandleForMatrixObject(mo, inputInfo);
}
else if( dat instanceof FrameObject ) {
FrameObject fo = getFrameObject(varname);
return getRDDHandleForFrameObject(fo, inputInfo);
}
else {
throw new DMLRuntimeException("Failed to obtain RDD for data type other than matrix or frame.");
}
}
/**
* This call returns an RDD handle for a given matrix object. This includes
* the creation of RDDs for in-memory or binary-block HDFS data.
*
* @param mo matrix object
* @param inputInfo input info
* @return JavaPairRDD handle for a matrix object
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
@SuppressWarnings("unchecked")
public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, InputInfo inputInfo )
throws DMLRuntimeException
{
//NOTE: MB this logic should be integrated into MatrixObject
//However, for now we cannot assume that spark libraries are
//always available and hence only store generic references in
//matrix object while all the logic is in the SparkExecContext
JavaSparkContext sc = getSparkContext();
JavaPairRDD<?,?> rdd = null;
//CASE 1: rdd already existing (reuse if checkpoint or trigger
//pending rdd operations if not yet cached but prevent to re-evaluate
//rdd operations if already executed and cached
if( mo.getRDDHandle()!=null
&& (mo.getRDDHandle().isCheckpointRDD() || !mo.isCached(false)) )
{
//return existing rdd handling (w/o input format change)
rdd = mo.getRDDHandle().getRDD();
}
//CASE 2: dirty in memory data or cached result of rdd operations
else if( mo.isDirty() || mo.isCached(false) )
{
//get in-memory matrix block and parallelize it
//w/ guarded parallelize (fallback to export, rdd from file if too large)
MatrixCharacteristics mc = mo.getMatrixCharacteristics();
boolean fromFile = false;
if( !OptimizerUtils.checkSparkCollectMemoryBudget(mc, 0) || !_parRDDs.reserve(
OptimizerUtils.estimatePartitionedSizeExactSparsity(mc))) {
if( mo.isDirty() || !mo.isHDFSFileExists() ) //write if necessary
mo.exportData();
rdd = sc.hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass);
rdd = SparkUtils.copyBinaryBlockMatrix((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd); //cp is workaround for read bug
fromFile = true;
}
else { //default case
MatrixBlock mb = mo.acquireRead(); //pin matrix in memory
rdd = toMatrixJavaPairRDD(sc, mb, (int)mo.getNumRowsPerBlock(), (int)mo.getNumColumnsPerBlock());
mo.release(); //unpin matrix
_parRDDs.registerRDD(rdd.id(), OptimizerUtils.estimatePartitionedSizeExactSparsity(mc), true);
}
//keep rdd handle for future operations on it
RDDObject rddhandle = new RDDObject(rdd);
rddhandle.setHDFSFile(fromFile);
rddhandle.setParallelizedRDD(!fromFile);
mo.setRDDHandle(rddhandle);
}
//CASE 3: non-dirty (file exists on HDFS)
else
{
// parallelize hdfs-resident file
// For binary block, these are: SequenceFileInputFormat.class, MatrixIndexes.class, MatrixBlock.class
if(inputInfo == InputInfo.BinaryBlockInputInfo) {
rdd = sc.hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass);
//note: this copy is still required in Spark 1.4 because spark hands out whatever the inputformat
//recordreader returns; the javadoc explicitly recommend to copy all key/value pairs
rdd = SparkUtils.copyBinaryBlockMatrix((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd); //cp is workaround for read bug
}
else if(inputInfo == InputInfo.TextCellInputInfo || inputInfo == InputInfo.CSVInputInfo || inputInfo == InputInfo.MatrixMarketInputInfo) {
rdd = sc.hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass);
rdd = ((JavaPairRDD<LongWritable, Text>)rdd).mapToPair( new CopyTextInputFunction() ); //cp is workaround for read bug
}
else if(inputInfo == InputInfo.BinaryCellInputInfo) {
rdd = sc.hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass);
rdd = ((JavaPairRDD<MatrixIndexes, MatrixCell>)rdd).mapToPair( new CopyBinaryCellFunction() ); //cp is workaround for read bug
}
else {
throw new DMLRuntimeException("Incorrect input format in getRDDHandleForVariable");
}
//keep rdd handle for future operations on it
RDDObject rddhandle = new RDDObject(rdd);
rddhandle.setHDFSFile(true);
mo.setRDDHandle(rddhandle);
}
return rdd;
}
/**
* FIXME: currently this implementation assumes matrix representations but frame signature
* in order to support the old transform implementation.
*
* @param fo frame object
* @param inputInfo input info
* @return JavaPairRDD handle for a frame object
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
@SuppressWarnings("unchecked")
public JavaPairRDD<?,?> getRDDHandleForFrameObject( FrameObject fo, InputInfo inputInfo )
throws DMLRuntimeException
{
//NOTE: MB this logic should be integrated into FrameObject
//However, for now we cannot assume that spark libraries are
//always available and hence only store generic references in
//matrix object while all the logic is in the SparkExecContext
InputInfo inputInfo2 = (inputInfo==InputInfo.BinaryBlockInputInfo) ?
InputInfo.BinaryBlockFrameInputInfo : inputInfo;
JavaSparkContext sc = getSparkContext();
JavaPairRDD<?,?> rdd = null;
//CASE 1: rdd already existing (reuse if checkpoint or trigger
//pending rdd operations if not yet cached but prevent to re-evaluate
//rdd operations if already executed and cached
if( fo.getRDDHandle()!=null
&& (fo.getRDDHandle().isCheckpointRDD() || !fo.isCached(false)) )
{
//return existing rdd handling (w/o input format change)
rdd = fo.getRDDHandle().getRDD();
}
//CASE 2: dirty in memory data or cached result of rdd operations
else if( fo.isDirty() || fo.isCached(false) )
{
//get in-memory matrix block and parallelize it
//w/ guarded parallelize (fallback to export, rdd from file if too large)
MatrixCharacteristics mc = fo.getMatrixCharacteristics();
boolean fromFile = false;
if( !OptimizerUtils.checkSparkCollectMemoryBudget(mc, 0) || !_parRDDs.reserve(
OptimizerUtils.estimatePartitionedSizeExactSparsity(mc)) ) {
if( fo.isDirty() ) { //write only if necessary
fo.exportData();
}
rdd = sc.hadoopFile( fo.getFileName(), inputInfo2.inputFormatClass, inputInfo2.inputKeyClass, inputInfo2.inputValueClass);
rdd = ((JavaPairRDD<LongWritable, FrameBlock>)rdd).mapToPair( new CopyFrameBlockPairFunction() ); //cp is workaround for read bug
fromFile = true;
}
else { //default case
FrameBlock fb = fo.acquireRead(); //pin frame in memory
rdd = toFrameJavaPairRDD(sc, fb);
fo.release(); //unpin frame
_parRDDs.registerRDD(rdd.id(), OptimizerUtils.estimatePartitionedSizeExactSparsity(mc), true);
}
//keep rdd handle for future operations on it
RDDObject rddhandle = new RDDObject(rdd);
rddhandle.setHDFSFile(fromFile);
fo.setRDDHandle(rddhandle);
}
//CASE 3: non-dirty (file exists on HDFS)
else
{
// parallelize hdfs-resident file
// For binary block, these are: SequenceFileInputFormat.class, MatrixIndexes.class, MatrixBlock.class
if(inputInfo2 == InputInfo.BinaryBlockFrameInputInfo) {
rdd = sc.hadoopFile( fo.getFileName(), inputInfo2.inputFormatClass, inputInfo2.inputKeyClass, inputInfo2.inputValueClass);
//note: this copy is still required in Spark 1.4 because spark hands out whatever the inputformat
//recordreader returns; the javadoc explicitly recommend to copy all key/value pairs
rdd = ((JavaPairRDD<LongWritable, FrameBlock>)rdd).mapToPair( new CopyFrameBlockPairFunction() ); //cp is workaround for read bug
}
else if(inputInfo2 == InputInfo.TextCellInputInfo || inputInfo2 == InputInfo.CSVInputInfo || inputInfo2 == InputInfo.MatrixMarketInputInfo) {
rdd = sc.hadoopFile( fo.getFileName(), inputInfo2.inputFormatClass, inputInfo2.inputKeyClass, inputInfo2.inputValueClass);
rdd = ((JavaPairRDD<LongWritable, Text>)rdd).mapToPair( new CopyTextInputFunction() ); //cp is workaround for read bug
}
else if(inputInfo2 == InputInfo.BinaryCellInputInfo) {
throw new DMLRuntimeException("Binarycell not supported for frames.");
}
else {
throw new DMLRuntimeException("Incorrect input format in getRDDHandleForVariable");
}
//keep rdd handle for future operations on it
RDDObject rddhandle = new RDDObject(rdd);
rddhandle.setHDFSFile(true);
fo.setRDDHandle(rddhandle);
}
return rdd;
}
@SuppressWarnings("unchecked")
public PartitionedBroadcast<MatrixBlock> getBroadcastForVariable( String varname )
throws DMLRuntimeException
{
//NOTE: The memory consumption of this method is the in-memory size of the
//matrix object plus the partitioned size in 1k-1k blocks. Since the call
//to broadcast happens after the matrix object has been released, the memory
//requirements of blockified chunks in Spark's block manager are covered under
//this maximum. Also note that we explicitly clear the in-memory blocks once
//the broadcasts are created (other than in local mode) in order to avoid
//unnecessary memory requirements during the lifetime of this broadcast handle.
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
MatrixObject mo = getMatrixObject(varname);
PartitionedBroadcast<MatrixBlock> bret = null;
//reuse existing broadcast handle
if( mo.getBroadcastHandle()!=null
&& mo.getBroadcastHandle().isValid() )
{
bret = mo.getBroadcastHandle().getBroadcast();
}
//create new broadcast handle (never created, evicted)
if( bret == null )
{
//account for overwritten invalid broadcast (e.g., evicted)
if( mo.getBroadcastHandle()!=null )
CacheableData.addBroadcastSize(-mo.getBroadcastHandle().getSize());
//obtain meta data for matrix
int brlen = (int) mo.getNumRowsPerBlock();
int bclen = (int) mo.getNumColumnsPerBlock();
//create partitioned matrix block and release memory consumed by input
MatrixBlock mb = mo.acquireRead();
PartitionedBlock<MatrixBlock> pmb = new PartitionedBlock<>(mb, brlen, bclen);
mo.release();
//determine coarse-grained partitioning
int numPerPart = PartitionedBroadcast.computeBlocksPerPartition(mo.getNumRows(), mo.getNumColumns(), brlen, bclen);
int numParts = (int) Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart);
Broadcast<PartitionedBlock<MatrixBlock>>[] ret = new Broadcast[numParts];
//create coarse-grained partitioned broadcasts
if( numParts > 1 ) {
for( int i=0; i<numParts; i++ ) {
int offset = i * numPerPart;
int numBlks = Math.min(numPerPart, pmb.getNumRowBlocks()*pmb.getNumColumnBlocks()-offset);
PartitionedBlock<MatrixBlock> tmp = pmb.createPartition(offset, numBlks, new MatrixBlock());
ret[i] = getSparkContext().broadcast(tmp);
if( !isLocalMaster() )
tmp.clearBlocks();
}
}
else { //single partition
ret[0] = getSparkContext().broadcast(pmb);
if( !isLocalMaster() )
pmb.clearBlocks();
}
bret = new PartitionedBroadcast<>(ret, mo.getMatrixCharacteristics());
BroadcastObject<MatrixBlock> bchandle = new BroadcastObject<>(bret,
OptimizerUtils.estimatePartitionedSizeExactSparsity(mo.getMatrixCharacteristics()));
mo.setBroadcastHandle(bchandle);
CacheableData.addBroadcastSize(bchandle.getSize());
}
if (DMLScript.STATISTICS) {
Statistics.accSparkBroadCastTime(System.nanoTime() - t0);
Statistics.incSparkBroadcastCount(1);
}
return bret;
}
@SuppressWarnings("unchecked")
public PartitionedBroadcast<FrameBlock> getBroadcastForFrameVariable( String varname)
throws DMLRuntimeException
{
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
FrameObject fo = getFrameObject(varname);
PartitionedBroadcast<FrameBlock> bret = null;
//reuse existing broadcast handle
if( fo.getBroadcastHandle()!=null
&& fo.getBroadcastHandle().isValid() )
{
bret = fo.getBroadcastHandle().getBroadcast();
}
//create new broadcast handle (never created, evicted)
if( bret == null )
{
//account for overwritten invalid broadcast (e.g., evicted)
if( fo.getBroadcastHandle()!=null )
CacheableData.addBroadcastSize(-fo.getBroadcastHandle().getSize());
//obtain meta data for frame
int bclen = (int) fo.getNumColumns();
int brlen = OptimizerUtils.getDefaultFrameSize();
//create partitioned frame block and release memory consumed by input
FrameBlock mb = fo.acquireRead();
PartitionedBlock<FrameBlock> pmb = new PartitionedBlock<>(mb, brlen, bclen);
fo.release();
//determine coarse-grained partitioning
int numPerPart = PartitionedBroadcast.computeBlocksPerPartition(fo.getNumRows(), fo.getNumColumns(), brlen, bclen);
int numParts = (int) Math.ceil((double)pmb.getNumRowBlocks()*pmb.getNumColumnBlocks() / numPerPart);
Broadcast<PartitionedBlock<FrameBlock>>[] ret = new Broadcast[numParts];
//create coarse-grained partitioned broadcasts
if( numParts > 1 ) {
for( int i=0; i<numParts; i++ ) {
int offset = i * numPerPart;
int numBlks = Math.min(numPerPart, pmb.getNumRowBlocks()*pmb.getNumColumnBlocks()-offset);
PartitionedBlock<FrameBlock> tmp = pmb.createPartition(offset, numBlks, new FrameBlock());
ret[i] = getSparkContext().broadcast(tmp);
if( !isLocalMaster() )
tmp.clearBlocks();
}
}
else { //single partition
ret[0] = getSparkContext().broadcast(pmb);
if( !isLocalMaster() )
pmb.clearBlocks();
}
bret = new PartitionedBroadcast<>(ret, fo.getMatrixCharacteristics());
BroadcastObject<FrameBlock> bchandle = new BroadcastObject<>(bret,
OptimizerUtils.estimatePartitionedSizeExactSparsity(fo.getMatrixCharacteristics()));
fo.setBroadcastHandle(bchandle);
CacheableData.addBroadcastSize(bchandle.getSize());
}
if (DMLScript.STATISTICS) {
Statistics.accSparkBroadCastTime(System.nanoTime() - t0);
Statistics.incSparkBroadcastCount(1);
}
return bret;
}
/**
* Keep the output rdd of spark rdd operations as meta data of matrix/frame
* objects in the symbol table.
*
* @param varname variable name
* @param rdd JavaPairRDD handle for variable
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
public void setRDDHandleForVariable(String varname, JavaPairRDD<?,?> rdd)
throws DMLRuntimeException
{
CacheableData<?> obj = getCacheableData(varname);
RDDObject rddhandle = new RDDObject(rdd);
obj.setRDDHandle( rddhandle );
}
/**
* Utility method for creating an RDD out of an in-memory matrix block.
*
* @param sc java spark context
* @param src matrix block
* @param brlen block row length
* @param bclen block column length
* @return JavaPairRDD handle to matrix block
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
public static JavaPairRDD<MatrixIndexes,MatrixBlock> toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int brlen, int bclen)
throws DMLRuntimeException
{
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
List<Tuple2<MatrixIndexes,MatrixBlock>> list = null;
if( src.getNumRows() <= brlen && src.getNumColumns() <= bclen ) {
list = Arrays.asList(new Tuple2<>(new MatrixIndexes(1,1), src));
}
else {
MatrixCharacteristics mc = new MatrixCharacteristics(
src.getNumRows(), src.getNumColumns(), brlen, bclen, src.getNonZeros());
list = LongStream.range(0, mc.getNumBlocks()).parallel()
.mapToObj(i -> createIndexedBlock(src, mc, i))
.collect(Collectors.toList());
}
JavaPairRDD<MatrixIndexes,MatrixBlock> result = sc.parallelizePairs(list);
if (DMLScript.STATISTICS) {
Statistics.accSparkParallelizeTime(System.nanoTime() - t0);
Statistics.incSparkParallelizeCount(1);
}
return result;
}
private static Tuple2<MatrixIndexes,MatrixBlock> createIndexedBlock(MatrixBlock mb, MatrixCharacteristics mc, long ix) {
try {
//compute block indexes
long blockRow = ix / mc.getNumColBlocks();
long blockCol = ix % mc.getNumColBlocks();
//compute block sizes
int maxRow = UtilFunctions.computeBlockSize(mc.getRows(), blockRow+1, mc.getRowsPerBlock());
int maxCol = UtilFunctions.computeBlockSize(mc.getCols(), blockCol+1, mc.getColsPerBlock());
//copy sub-matrix to block
MatrixBlock block = new MatrixBlock(maxRow, maxCol, mb.isInSparseFormat());
int row_offset = (int)blockRow*mc.getRowsPerBlock();
int col_offset = (int)blockCol*mc.getColsPerBlock();
block = mb.slice( row_offset, row_offset+maxRow-1,
col_offset, col_offset+maxCol-1, block );
//create key-value pair
return new Tuple2<>(new MatrixIndexes(blockRow+1, blockCol+1), block);
}
catch(DMLRuntimeException ex) {
throw new RuntimeException(ex);
}
}
public static JavaPairRDD<Long,FrameBlock> toFrameJavaPairRDD(JavaSparkContext sc, FrameBlock src)
throws DMLRuntimeException
{
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
LinkedList<Tuple2<Long,FrameBlock>> list = new LinkedList<>();
//create and write subblocks of matrix
int blksize = ConfigurationManager.getBlocksize();
for(int blockRow = 0; blockRow < (int)Math.ceil(src.getNumRows()/(double)blksize); blockRow++)
{
int maxRow = (blockRow*blksize + blksize < src.getNumRows()) ? blksize : src.getNumRows() - blockRow*blksize;
int roffset = blockRow*blksize;
FrameBlock block = new FrameBlock(src.getSchema());
//copy sub frame to block, incl meta data on first
src.slice( roffset, roffset+maxRow-1, 0, src.getNumColumns()-1, block );
if( roffset == 0 )
block.setColumnMetadata(src.getColumnMetadata());
//append block to sequence file
list.addLast(new Tuple2<>((long)roffset+1, block));
}
JavaPairRDD<Long,FrameBlock> result = sc.parallelizePairs(list);
if (DMLScript.STATISTICS) {
Statistics.accSparkParallelizeTime(System.nanoTime() - t0);
Statistics.incSparkParallelizeCount(1);
}
return result;
}
/**
* This method is a generic abstraction for calls from the buffer pool.
*
* @param rdd rdd object
* @param rlen number of rows
* @param clen number of columns
* @param brlen number of rows in a block
* @param bclen number of columns in a block
* @param nnz number of non-zeros
* @return matrix block
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
@SuppressWarnings("unchecked")
public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, int brlen, int bclen, long nnz)
throws DMLRuntimeException
{
return toMatrixBlock(
(JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD(),
rlen, clen, brlen, bclen, nnz);
}
/**
* Utility method for creating a single matrix block out of a binary block RDD.
* Note that this collect call might trigger execution of any pending transformations.
*
* NOTE: This is an unguarded utility function, which requires memory for both the output matrix
* and its collected, blocked representation.
*
* @param rdd JavaPairRDD for matrix block
* @param rlen number of rows
* @param clen number of columns
* @param brlen number of rows in a block
* @param bclen number of columns in a block
* @param nnz number of non-zeros
* @return matrix block
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz)
throws DMLRuntimeException
{
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
MatrixBlock out = null;
if( rlen <= brlen && clen <= bclen ) //SINGLE BLOCK
{
//special case without copy and nnz maintenance
List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect();
if( list.size()>1 )
throw new DMLRuntimeException("Expecting no more than one result block.");
else if( list.size()==1 )
out = list.get(0)._2();
else //empty (e.g., after ops w/ outputEmpty=false)
out = new MatrixBlock(rlen, clen, true);
out.examSparsity();
}
else //MULTIPLE BLOCKS
{
//determine target sparse/dense representation
long lnnz = (nnz >= 0) ? nnz : (long)rlen * clen;
boolean sparse = MatrixBlock.evalSparseFormatInMemory(rlen, clen, lnnz);
//create output matrix block (w/ lazy allocation)
out = new MatrixBlock(rlen, clen, sparse, lnnz);
List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect();
//copy blocks one-at-a-time into output matrix block
long aNnz = 0;
for( Tuple2<MatrixIndexes,MatrixBlock> keyval : list )
{
//unpack index-block pair
MatrixIndexes ix = keyval._1();
MatrixBlock block = keyval._2();
//compute row/column block offsets
int row_offset = (int)(ix.getRowIndex()-1)*brlen;
int col_offset = (int)(ix.getColumnIndex()-1)*bclen;
int rows = block.getNumRows();
int cols = block.getNumColumns();
//handle compressed blocks (decompress for robustness)
if( block instanceof CompressedMatrixBlock )
block = ((CompressedMatrixBlock)block).decompress();
//append block
if( sparse ) { //SPARSE OUTPUT
//append block to sparse target in order to avoid shifting, where
//we use a shallow row copy in case of MCSR and single column blocks
//note: this append requires, for multiple column blocks, a final sort
out.appendToSparse(block, row_offset, col_offset, clen>bclen);
}
else { //DENSE OUTPUT
out.copy( row_offset, row_offset+rows-1,
col_offset, col_offset+cols-1, block, false );
}
//incremental maintenance nnz
aNnz += block.getNonZeros();
}
//post-processing output matrix
if( sparse && clen>bclen )
out.sortSparseRows();
out.setNonZeros(aNnz);
out.examSparsity();
}
if (DMLScript.STATISTICS) {
Statistics.accSparkCollectTime(System.nanoTime() - t0);
Statistics.incSparkCollectCount(1);
}
return out;
}
@SuppressWarnings("unchecked")
public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, long nnz)
throws DMLRuntimeException
{
return toMatrixBlock(
(JavaPairRDD<MatrixIndexes, MatrixCell>) rdd.getRDD(),
rlen, clen, nnz);
}
/**
* Utility method for creating a single matrix block out of a binary cell RDD.
* Note that this collect call might trigger execution of any pending transformations.
*
* @param rdd JavaPairRDD for matrix block
* @param rlen number of rows
* @param clen number of columns
* @param nnz number of non-zeros
* @return matrix block
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes, MatrixCell> rdd, int rlen, int clen, long nnz)
throws DMLRuntimeException
{
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
MatrixBlock out = null;
//determine target sparse/dense representation
long lnnz = (nnz >= 0) ? nnz : (long)rlen * clen;
boolean sparse = MatrixBlock.evalSparseFormatInMemory(rlen, clen, lnnz);
//create output matrix block (w/ lazy allocation)
out = new MatrixBlock(rlen, clen, sparse);
List<Tuple2<MatrixIndexes,MatrixCell>> list = rdd.collect();
//copy blocks one-at-a-time into output matrix block
for( Tuple2<MatrixIndexes,MatrixCell> keyval : list )
{
//unpack index-block pair
MatrixIndexes ix = keyval._1();
MatrixCell cell = keyval._2();
//append cell to dense/sparse target in order to avoid shifting for sparse
//note: this append requires a final sort of sparse rows
out.appendValue((int)ix.getRowIndex()-1, (int)ix.getColumnIndex()-1, cell.getValue());
}
//post-processing output matrix
if( sparse )
out.sortSparseRows();
out.recomputeNonZeros();
out.examSparsity();
if (DMLScript.STATISTICS) {
Statistics.accSparkCollectTime(System.nanoTime() - t0);
Statistics.incSparkCollectCount(1);
}
return out;
}
public static PartitionedBlock<MatrixBlock> toPartitionedMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int brlen, int bclen, long nnz)
throws DMLRuntimeException
{
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
PartitionedBlock<MatrixBlock> out = new PartitionedBlock<>(rlen, clen, brlen, bclen);
List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect();
//copy blocks one-at-a-time into output matrix block
for( Tuple2<MatrixIndexes,MatrixBlock> keyval : list )
{
//unpack index-block pair
MatrixIndexes ix = keyval._1();
MatrixBlock block = keyval._2();
out.setBlock((int)ix.getRowIndex(), (int)ix.getColumnIndex(), block);
}
if (DMLScript.STATISTICS) {
Statistics.accSparkCollectTime(System.nanoTime() - t0);
Statistics.incSparkCollectCount(1);
}
return out;
}
@SuppressWarnings("unchecked")
public static FrameBlock toFrameBlock(RDDObject rdd, ValueType[] schema, int rlen, int clen)
throws DMLRuntimeException
{
JavaPairRDD<Long,FrameBlock> lrdd = (JavaPairRDD<Long,FrameBlock>) rdd.getRDD();
return toFrameBlock(lrdd, schema, rlen, clen);
}
public static FrameBlock toFrameBlock(JavaPairRDD<Long,FrameBlock> rdd, ValueType[] schema, int rlen, int clen)
throws DMLRuntimeException
{
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
if(schema == null)
schema = UtilFunctions.nCopies(clen, ValueType.STRING);
//create output frame block (w/ lazy allocation)
FrameBlock out = new FrameBlock(schema);
out.ensureAllocatedColumns(rlen);
List<Tuple2<Long,FrameBlock>> list = rdd.collect();
//copy blocks one-at-a-time into output matrix block
for( Tuple2<Long,FrameBlock> keyval : list )
{
//unpack index-block pair
int ix = (int)(keyval._1() - 1);
FrameBlock block = keyval._2();
//copy into output frame
out.copy( ix, ix+block.getNumRows()-1, 0, block.getNumColumns()-1, block );
if( ix == 0 ) {
out.setColumnNames(block.getColumnNames());
out.setColumnMetadata(block.getColumnMetadata());
}
}
if (DMLScript.STATISTICS) {
Statistics.accSparkCollectTime(System.nanoTime() - t0);
Statistics.incSparkCollectCount(1);
}
return out;
}
@SuppressWarnings("unchecked")
public static long writeRDDtoHDFS( RDDObject rdd, String path, OutputInfo oinfo )
{
JavaPairRDD<MatrixIndexes,MatrixBlock> lrdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD();
//piggyback nnz maintenance on write
LongAccumulator aNnz = getSparkContextStatic().sc().longAccumulator("nnz");
lrdd = lrdd.mapValues(new ComputeBinaryBlockNnzFunction(aNnz));
//save file is an action which also triggers nnz maintenance
lrdd.saveAsHadoopFile(path,
oinfo.outputKeyClass,
oinfo.outputValueClass,
oinfo.outputFormatClass);
//return nnz aggregate of all blocks
return aNnz.value();
}
@SuppressWarnings("unchecked")
public static void writeFrameRDDtoHDFS( RDDObject rdd, String path, OutputInfo oinfo )
{
JavaPairRDD<?, FrameBlock> lrdd = (JavaPairRDD<Long, FrameBlock>) rdd.getRDD();
//convert keys to writables if necessary
if( oinfo == OutputInfo.BinaryBlockOutputInfo ) {
lrdd = ((JavaPairRDD<Long, FrameBlock>)lrdd).mapToPair(
new LongFrameToLongWritableFrameFunction());
oinfo = OutputInfo.BinaryBlockFrameOutputInfo;
}
//save file is an action which also triggers nnz maintenance
lrdd.saveAsHadoopFile(path,
oinfo.outputKeyClass,
oinfo.outputValueClass,
oinfo.outputFormatClass);
}
///////////////////////////////////////////
// Cleanup of RDDs and Broadcast variables
///////
/**
* Adds a child rdd object to the lineage of a parent rdd.
*
* @param varParent parent variable
* @param varChild child variable
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
public void addLineageRDD(String varParent, String varChild)
throws DMLRuntimeException
{
RDDObject parent = getCacheableData(varParent).getRDDHandle();
RDDObject child = getCacheableData(varChild).getRDDHandle();
parent.addLineageChild( child );
}
/**
* Adds a child broadcast object to the lineage of a parent rdd.
*
* @param varParent parent variable
* @param varChild child variable
* @throws DMLRuntimeException if DMLRuntimeException occurs
*/
public void addLineageBroadcast(String varParent, String varChild)
throws DMLRuntimeException
{
RDDObject parent = getCacheableData(varParent).getRDDHandle();
BroadcastObject<?> child = getCacheableData(varChild).getBroadcastHandle();
parent.addLineageChild( child );
}
public void addLineage(String varParent, String varChild, boolean broadcast)
throws DMLRuntimeException
{
if( broadcast )
addLineageBroadcast(varParent, varChild);
else
addLineageRDD(varParent, varChild);
}
@Override
public void cleanupCacheableData( CacheableData<?> mo )
throws DMLRuntimeException
{
//NOTE: this method overwrites the default behavior of cleanupMatrixObject
//and hence is transparently used by rmvar instructions and other users. The
//core difference is the lineage-based cleanup of RDD and broadcast variables.
try
{
if ( mo.isCleanupEnabled() )
{
//compute ref count only if matrix cleanup actually necessary
if ( !getVariables().hasReferences(mo) )
{
//clean cached data
mo.clearData();
//clean hdfs data if no pending rdd operations on it
if( mo.isHDFSFileExists() && mo.getFileName()!=null ) {
if( mo.getRDDHandle()==null ) {
MapReduceTool.deleteFileWithMTDIfExistOnHDFS(mo.getFileName());
}
else { //deferred file removal
RDDObject rdd = mo.getRDDHandle();
rdd.setHDFSFilename(mo.getFileName());
}
}
//cleanup RDD and broadcast variables (recursive)
//note: requires that mo.clearData already removed back references
if( mo.getRDDHandle()!=null ) {
rCleanupLineageObject(mo.getRDDHandle());
}
if( mo.getBroadcastHandle()!=null ) {
rCleanupLineageObject(mo.getBroadcastHandle());
}
}
}
}
catch(Exception ex)
{
throw new DMLRuntimeException(ex);
}
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private void rCleanupLineageObject(LineageObject lob)
throws IOException
{
//abort recursive cleanup if still consumers
if( lob.getNumReferences() > 0 )
return;
//abort if still reachable through matrix object (via back references for
//robustness in function calls and to prevent repeated scans of the symbol table)
if( lob.hasBackReference() )
return;
//cleanup current lineage object (from driver/executors)
//incl deferred hdfs file removal (only if metadata set by cleanup call)
if( lob instanceof RDDObject ) {
RDDObject rdd = (RDDObject)lob;
int rddID = rdd.getRDD().id();
cleanupRDDVariable(rdd.getRDD());
if( rdd.getHDFSFilename()!=null ) { //deferred file removal
MapReduceTool.deleteFileWithMTDIfExistOnHDFS(rdd.getHDFSFilename());
}
if( rdd.isParallelizedRDD() )
_parRDDs.deregisterRDD(rddID);
}
else if( lob instanceof BroadcastObject ) {
PartitionedBroadcast pbm = ((BroadcastObject)lob).getBroadcast();
if( pbm != null ) //robustness for evictions
for( Broadcast<PartitionedBlock> bc : pbm.getBroadcasts() )
cleanupBroadcastVariable(bc);
CacheableData.addBroadcastSize(-((BroadcastObject)lob).getSize());
}
//recursively process lineage children
for( LineageObject c : lob.getLineageChilds() ){
c.decrementNumReferences();
rCleanupLineageObject(c);
}
}
/**
* This call destroys a broadcast variable at all executors and the driver.
* Hence, it is intended to be used on rmvar only. Depending on the
* ASYNCHRONOUS_VAR_DESTROY configuration, this is asynchronous or not.
*
* @param bvar broadcast variable
*/
public static void cleanupBroadcastVariable(Broadcast<?> bvar)
{
//In comparison to 'unpersist' (which would only delete the broadcast
//from the executors), this call also deletes related data from the driver.
if( bvar.isValid() ) {
bvar.destroy( !ASYNCHRONOUS_VAR_DESTROY );
}
}
/**
* This call removes an rdd variable from executor memory and disk if required.
* Hence, it is intended to be used on rmvar only. Depending on the
* ASYNCHRONOUS_VAR_DESTROY configuration, this is asynchronous or not.
*
* @param rvar rdd variable to remove
*/
public static void cleanupRDDVariable(JavaPairRDD<?,?> rvar)
{
if( rvar.getStorageLevel()!=StorageLevel.NONE() ) {
rvar.unpersist( !ASYNCHRONOUS_VAR_DESTROY );
}
}
@SuppressWarnings("unchecked")
public void repartitionAndCacheMatrixObject( String var )
throws DMLRuntimeException
{
MatrixObject mo = getMatrixObject(var);
MatrixCharacteristics mcIn = mo.getMatrixCharacteristics();
//double check size to avoid unnecessary spark context creation
if( !OptimizerUtils.exceedsCachingThreshold(mo.getNumColumns(), (double)
OptimizerUtils.estimateSizeExactSparsity(mcIn)) )
return;
//get input rdd and default storage level
JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>)
getRDDHandleForMatrixObject(mo, InputInfo.BinaryBlockInputInfo);
//avoid unnecessary caching of input in order to reduce memory pressure
if( mo.getRDDHandle().allowsShortCircuitRead()
&& isRDDMarkedForCaching(in.id()) && !isRDDCached(in.id()) ) {
in = (JavaPairRDD<MatrixIndexes,MatrixBlock>)
((RDDObject)mo.getRDDHandle().getLineageChilds().get(0)).getRDD();
//investigate issue of unnecessarily large number of partitions
int numPartitions = SparkUtils.getNumPreferredPartitions(mcIn, in);
if( numPartitions < in.getNumPartitions() )
in = in.coalesce( numPartitions );
}
//repartition rdd (force creation of shuffled rdd via merge), note: without deep copy albeit
//executed on the original data, because there will be no merge, i.e., no key duplicates
JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDAggregateUtils.mergeByKey(in, false);
//convert mcsr into memory-efficient csr if potentially sparse
if( OptimizerUtils.checkSparseBlockCSRConversion(mcIn) ) {
out = out.mapValues(new CreateSparseBlockFunction(SparseBlock.Type.CSR));
}
//persist rdd in default storage level
out.persist( Checkpoint.DEFAULT_STORAGE_LEVEL )
.count(); //trigger caching to prevent contention
//create new rdd handle, in-place of current matrix object
RDDObject inro = mo.getRDDHandle(); //guaranteed to exist (see above)
RDDObject outro = new RDDObject(out); //create new rdd object
outro.setCheckpointRDD(true); //mark as checkpointed
outro.addLineageChild(inro); //keep lineage to prevent cycles on cleanup
mo.setRDDHandle(outro);
}
@SuppressWarnings("unchecked")
public void cacheMatrixObject( String var )
throws DMLRuntimeException
{
//get input rdd and default storage level
MatrixObject mo = getMatrixObject(var);
//double check size to avoid unnecessary spark context creation
if( !OptimizerUtils.exceedsCachingThreshold(mo.getNumColumns(), (double)
OptimizerUtils.estimateSizeExactSparsity(mo.getMatrixCharacteristics())) )
return;
JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>)
getRDDHandleForMatrixObject(mo, InputInfo.BinaryBlockInputInfo);
//persist rdd (force rdd caching, if not already cached)
if( !isRDDCached(in.id()) )
in.count(); //trigger caching to prevent contention
}
public void setThreadLocalSchedulerPool(String poolName) {
if( FAIR_SCHEDULER_MODE ) {
getSparkContext().sc().setLocalProperty(
"spark.scheduler.pool", poolName);
}
}
public void cleanupThreadLocalSchedulerPool() {
if( FAIR_SCHEDULER_MODE ) {
getSparkContext().sc().setLocalProperty(
"spark.scheduler.pool", null);
}
}
private boolean isRDDMarkedForCaching( int rddID ) {
JavaSparkContext jsc = getSparkContext();
return jsc.sc().getPersistentRDDs().contains(rddID);
}
public boolean isRDDCached( int rddID ) {
//check that rdd is marked for caching
JavaSparkContext jsc = getSparkContext();
if( !jsc.sc().getPersistentRDDs().contains(rddID) ) {
return false;
}
//check that rdd is actually already cached
for( RDDInfo info : jsc.sc().getRDDStorageInfo() ) {
if( info.id() == rddID )
return info.isCached();
}
return false;
}
///////////////////////////////////////////
// Spark configuration handling
///////
/**
* Obtains the lazily analyzed spark cluster configuration.
*
* @return spark cluster configuration
*/
public static SparkClusterConfig getSparkClusterConfig() {
//lazy creation of spark cluster config
if( _sconf == null )
_sconf = new SparkClusterConfig();
return _sconf;
}
/**
* Obtains the available memory budget for broadcast variables in bytes.
*
* @return broadcast memory budget
*/
public static double getBroadcastMemoryBudget() {
return getSparkClusterConfig()
.getBroadcastMemoryBudget();
}
/**
* Obtain the available memory budget for data storage in bytes.
*
* @param min flag for minimum data budget
* @param refresh flag for refresh with spark context
* @return data memory budget
*/
public static double getDataMemoryBudget(boolean min, boolean refresh) {
return getSparkClusterConfig()
.getDataMemoryBudget(min, refresh);
}
/**
* Obtain the number of executors in the cluster (excluding the driver).
*
* @return number of executors
*/
public static int getNumExecutors() {
return getSparkClusterConfig()
.getNumExecutors();
}
/**
* Obtain the default degree of parallelism (cores in the cluster).
*
* @param refresh flag for refresh with spark context
* @return default degree of parallelism
*/
public static int getDefaultParallelism(boolean refresh) {
return getSparkClusterConfig()
.getDefaultParallelism(refresh);
}
/**
* Captures relevant spark cluster configuration properties, e.g., memory budgets and
* degree of parallelism. This configuration abstracts legacy (< Spark 1.6) and current
* configurations and provides a unified view.
*/
public static class SparkClusterConfig
{
//broadcasts are stored in mem-and-disk in data space, this config
//defines the fraction of data space to be used as broadcast budget
private static final double BROADCAST_DATA_FRACTION = 0.35;
//forward private config from Spark's UnifiedMemoryManager.scala (>1.6)
private static final long RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024;
//meta configurations
private boolean _legacyVersion = false; //spark version <1.6
private boolean _confOnly = false; //infrastructure info based on config
//memory management configurations
private long _memExecutor = -1; //mem per executor
private double _memDataMinFrac = -1; //minimum data fraction
private double _memDataMaxFrac = -1; //maximum data fraction
private double _memBroadcastFrac = -1; //broadcast fraction
//degree of parallelism configurations
private int _numExecutors = -1; //total executors
private int _defaultPar = -1; //total vcores
public SparkClusterConfig()
{
SparkConf sconf = createSystemMLSparkConf();
_confOnly = true;
//parse version and config
String sparkVersion = getSparkVersionString();
_legacyVersion = (UtilFunctions.compareVersion(sparkVersion, "1.6.0") < 0
|| sconf.getBoolean("spark.memory.useLegacyMode", false) );
//obtain basic spark configurations
if( _legacyVersion )
analyzeSparkConfiguationLegacy(sconf);
else
analyzeSparkConfiguation(sconf);
//log debug of created spark cluster config
if( LOG.isDebugEnabled() )
LOG.debug( this.toString() );
}
public long getBroadcastMemoryBudget() {
return (long) (_memExecutor * _memBroadcastFrac);
}
public long getDataMemoryBudget(boolean min, boolean refresh) {
//always get the current num executors on refresh because this might
//change if not all executors are initially allocated and it is plan-relevant
int numExec = _numExecutors;
if( (refresh && !_confOnly) || isSparkContextCreated() ) {
JavaSparkContext jsc = getSparkContextStatic();
numExec = Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1);
}
//compute data memory budget
return (long) ( numExec * _memExecutor *
(min ? _memDataMinFrac : _memDataMaxFrac) );
}
public int getNumExecutors() {
if( _numExecutors < 0 )
analyzeSparkParallelismConfiguation(null);
return _numExecutors;
}
public int getDefaultParallelism(boolean refresh) {
if( _defaultPar < 0 && !refresh )
analyzeSparkParallelismConfiguation(null);
//always get the current default parallelism on refresh because this might
//change if not all executors are initially allocated and it is plan-relevant
int par = ( (refresh && !_confOnly) || isSparkContextCreated() ) ?
getSparkContextStatic().defaultParallelism() : _defaultPar;
return Math.max(par, 1); //robustness min parallelism
}
public void analyzeSparkConfiguationLegacy(SparkConf conf) {
//ensure allocated spark conf
SparkConf sconf = (conf == null) ? createSystemMLSparkConf() : conf;
//parse absolute executor memory
_memExecutor = UtilFunctions.parseMemorySize(
sconf.get("spark.executor.memory", "1g"));
//get data and shuffle memory ratios (defaults not specified in job conf)
double dataFrac = sconf.getDouble("spark.storage.memoryFraction", 0.6); //default 60%
_memDataMinFrac = dataFrac;
_memDataMaxFrac = dataFrac;
_memBroadcastFrac = dataFrac * BROADCAST_DATA_FRACTION; //default 18%
//analyze spark degree of parallelism
analyzeSparkParallelismConfiguation(sconf);
}
public void analyzeSparkConfiguation(SparkConf conf) {
//ensure allocated spark conf
SparkConf sconf = (conf == null) ? createSystemMLSparkConf() : conf;
//parse absolute executor memory, incl fixed cut off
_memExecutor = UtilFunctions.parseMemorySize(
sconf.get("spark.executor.memory", "1g"))
- RESERVED_SYSTEM_MEMORY_BYTES;
//get data and shuffle memory ratios (defaults not specified in job conf)
_memDataMinFrac = sconf.getDouble("spark.memory.storageFraction", 0.5); //default 50%
_memDataMaxFrac = sconf.getDouble("spark.memory.fraction", 0.6); //default 60%
_memBroadcastFrac = _memDataMaxFrac * BROADCAST_DATA_FRACTION; //default 21%
//analyze spark degree of parallelism
analyzeSparkParallelismConfiguation(sconf);
}
private void analyzeSparkParallelismConfiguation(SparkConf conf) {
//ensure allocated spark conf
SparkConf sconf = (conf == null) ? createSystemMLSparkConf() : conf;
int numExecutors = sconf.getInt("spark.executor.instances", -1);
int numCoresPerExec = sconf.getInt("spark.executor.cores", -1);
int defaultPar = sconf.getInt("spark.default.parallelism", -1);
if( numExecutors > 1 && (defaultPar > 1 || numCoresPerExec > 1) ) {
_numExecutors = numExecutors;
_defaultPar = (defaultPar>1) ? defaultPar : numExecutors * numCoresPerExec;
_confOnly &= true;
}
else {
//get default parallelism (total number of executors and cores)
//note: spark context provides this information while conf does not
//(for num executors we need to correct for driver and local mode)
JavaSparkContext jsc = getSparkContextStatic();
_numExecutors = Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1);
_defaultPar = jsc.defaultParallelism();
_confOnly &= false; //implies env info refresh w/ spark context
}
}
/**
* Obtains the spark version string. If the spark context has been created,
* we simply get it from the context; otherwise, we use Spark internal
* constants to avoid creating the spark context just for the version.
*
* @return spark version string
*/
private static String getSparkVersionString() {
//check for existing spark context
if( isSparkContextCreated() )
return getSparkContextStatic().version();
//use spark internal constant to avoid context creation
return org.apache.spark.package$.MODULE$.SPARK_VERSION();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("SparkClusterConfig: \n");
sb.append("-- legacyVersion = " + _legacyVersion + " ("+getSparkVersionString()+")\n" );
sb.append("-- confOnly = " + _confOnly + "\n");
sb.append("-- numExecutors = " + _numExecutors + "\n");
sb.append("-- defaultPar = " + _defaultPar + "\n");
sb.append("-- memExecutor = " + _memExecutor + "\n");
sb.append("-- memDataMinFrac = " + _memDataMinFrac + "\n");
sb.append("-- memDataMaxFrac = " + _memDataMaxFrac + "\n");
sb.append("-- memBroadcastFrac = " + _memBroadcastFrac + "\n");
return sb.toString();
}
}
private static class MemoryManagerParRDDs
{
private final long _limit;
private long _size;
private HashMap<Integer, Long> _rdds;
public MemoryManagerParRDDs(double fractionMem) {
_limit = (long)(fractionMem * InfrastructureAnalyzer.getLocalMaxMemory());
_size = 0;
_rdds = new HashMap<>();
}
public synchronized boolean reserve(long rddSize) {
boolean ret = (rddSize + _size < _limit);
_size += ret ? rddSize : 0;
return ret;
}
public synchronized void registerRDD(int rddID, long rddSize, boolean reserved) {
if( !reserved ) {
throw new RuntimeException("Unsupported rdd registration "
+ "without size reservation for "+rddSize+" bytes.");
}
_rdds.put(rddID, rddSize);
}
public synchronized void deregisterRDD(int rddID) {
long rddSize = _rdds.remove(rddID);
_size -= rddSize;
}
public synchronized void clear() {
_size = 0;
_rdds.clear();
}
}
}