| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.sysds.runtime.controlprogram.context; |
| |
| import org.apache.commons.lang3.ArrayUtils; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.io.Text; |
| import org.apache.spark.SparkConf; |
| import org.apache.spark.api.java.JavaPairRDD; |
| import org.apache.spark.api.java.JavaSparkContext; |
| import org.apache.spark.broadcast.Broadcast; |
| import org.apache.spark.storage.RDDInfo; |
| import org.apache.spark.storage.StorageLevel; |
| import org.apache.spark.util.LongAccumulator; |
| import org.apache.sysds.api.DMLScript; |
| import org.apache.sysds.api.mlcontext.MLContext; |
| import org.apache.sysds.api.mlcontext.MLContextUtil; |
| import org.apache.sysds.common.Types.DataType; |
| import org.apache.sysds.common.Types.ExecMode; |
| import org.apache.sysds.common.Types.FileFormat; |
| import org.apache.sysds.common.Types.ValueType; |
| import org.apache.sysds.conf.ConfigurationManager; |
| import org.apache.sysds.conf.DMLConfig; |
| import org.apache.sysds.hops.OptimizerUtils; |
| import org.apache.sysds.lops.Checkpoint; |
| import org.apache.sysds.runtime.DMLRuntimeException; |
| import org.apache.sysds.runtime.compress.CompressedMatrixBlock; |
| import org.apache.sysds.runtime.controlprogram.Program; |
| import org.apache.sysds.runtime.controlprogram.caching.CacheBlock; |
| import org.apache.sysds.runtime.controlprogram.caching.CacheableData; |
| import org.apache.sysds.runtime.controlprogram.caching.FrameObject; |
| import org.apache.sysds.runtime.controlprogram.caching.MatrixObject; |
| import org.apache.sysds.runtime.controlprogram.caching.TensorObject; |
| import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; |
| import org.apache.sysds.runtime.data.TensorBlock; |
| import org.apache.sysds.runtime.data.SparseBlock; |
| import org.apache.sysds.runtime.data.TensorIndexes; |
| import org.apache.sysds.runtime.instructions.cp.Data; |
| import org.apache.sysds.runtime.instructions.spark.data.BroadcastObject; |
| import org.apache.sysds.runtime.instructions.spark.data.LineageObject; |
| import org.apache.sysds.runtime.instructions.spark.data.PartitionedBlock; |
| import org.apache.sysds.runtime.instructions.spark.data.PartitionedBroadcast; |
| import org.apache.sysds.runtime.instructions.spark.data.RDDObject; |
| import org.apache.sysds.runtime.instructions.spark.functions.ComputeBinaryBlockNnzFunction; |
| import org.apache.sysds.runtime.instructions.spark.functions.CopyFrameBlockPairFunction; |
| import org.apache.sysds.runtime.instructions.spark.functions.CopyTextInputFunction; |
| import org.apache.sysds.runtime.instructions.spark.functions.CreateSparseBlockFunction; |
| import org.apache.sysds.runtime.instructions.spark.utils.FrameRDDConverterUtils.LongFrameToLongWritableFrameFunction; |
| import org.apache.sysds.runtime.instructions.spark.utils.RDDAggregateUtils; |
| import org.apache.sysds.runtime.instructions.spark.utils.SparkUtils; |
| import org.apache.sysds.runtime.io.IOUtilFunctions; |
| import org.apache.sysds.runtime.io.InputOutputInfo; |
| import org.apache.sysds.runtime.matrix.data.FrameBlock; |
| import org.apache.sysds.runtime.matrix.data.MatrixBlock; |
| import org.apache.sysds.runtime.matrix.data.MatrixCell; |
| import org.apache.sysds.runtime.matrix.data.MatrixIndexes; |
| import org.apache.sysds.runtime.meta.DataCharacteristics; |
| import org.apache.sysds.runtime.meta.MatrixCharacteristics; |
| import org.apache.sysds.runtime.meta.TensorCharacteristics; |
| import org.apache.sysds.runtime.util.HDFSTool; |
| import org.apache.sysds.runtime.util.UtilFunctions; |
| import org.apache.sysds.utils.MLContextProxy; |
| import org.apache.sysds.utils.Statistics; |
| import scala.Tuple2; |
| |
| import java.io.IOException; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.concurrent.Future; |
| import java.util.stream.Collectors; |
| import java.util.stream.LongStream; |
| |
| |
| public class SparkExecutionContext extends ExecutionContext |
| { |
| |
| //internal configurations |
| private static final boolean LAZY_SPARKCTX_CREATION = true; |
| private static final boolean ASYNCHRONOUS_VAR_DESTROY = true; |
| public static final boolean FAIR_SCHEDULER_MODE = true; |
| |
| //executor memory and relative fractions as obtained from the spark configuration |
| private static SparkClusterConfig _sconf = null; |
| |
| //singleton spark context (as there can be only one spark context per JVM) |
| private static JavaSparkContext _spctx = null; |
| |
| //registry of parallelized RDDs to enforce that at any time, we spent at most |
| //10% of JVM max heap size for parallelized RDDs; if this is not sufficient, |
| //matrices or frames are exported to HDFS and the RDDs are created from files. |
| //TODO unify memory management for CP, par RDDs, and potentially broadcasts |
| private static final MemoryManagerParRDDs _parRDDs = new MemoryManagerParRDDs(0.1); |
| |
| //pool of reused fair scheduler pool names (unset bits indicate availability) |
| private static boolean[] _poolBuff = FAIR_SCHEDULER_MODE ? |
| new boolean[InfrastructureAnalyzer.getLocalParallelism()] : null; |
| |
| protected SparkExecutionContext(boolean allocateVars, boolean allocateLineage, Program prog) { |
| //protected constructor to force use of ExecutionContextFactory |
| super( allocateVars, allocateLineage, prog ); |
| |
| //spark context creation via internal initializer |
| if( !LAZY_SPARKCTX_CREATION || DMLScript.getGlobalExecMode()==ExecMode.SPARK ) { |
| initSparkContext(); |
| } |
| } |
| |
| /** |
| * Returns the used singleton spark context. In case of lazy spark context |
| * creation, this methods blocks until the spark context is created. |
| * |
| * @return java spark context |
| */ |
| public JavaSparkContext getSparkContext() { |
| //lazy spark context creation on demand (lazy instead of asynchronous |
| //to avoid wait for uninitialized spark context on close) |
| if( LAZY_SPARKCTX_CREATION ) { |
| initSparkContext(); |
| } |
| |
| //return the created spark context |
| return _spctx; |
| } |
| |
| public static JavaSparkContext getSparkContextStatic() { |
| initSparkContext(); |
| return _spctx; |
| } |
| |
| /** |
| * Indicates if the spark context has been created or has |
| * been passed in from outside. |
| * |
| * @return true if spark context created |
| */ |
| public synchronized static boolean isSparkContextCreated() { |
| return (_spctx != null); |
| } |
| |
| public static void resetSparkContextStatic() { |
| _spctx = null; |
| } |
| |
| public void close() { |
| synchronized( SparkExecutionContext.class ) { |
| if( _spctx != null ) { |
| //stop the spark context if existing |
| _spctx.stop(); |
| //make sure stopped context is never used again |
| _spctx = null; |
| } |
| } |
| } |
| |
| public static boolean isLazySparkContextCreation(){ |
| return LAZY_SPARKCTX_CREATION; |
| } |
| |
| private synchronized static void initSparkContext() |
| { |
| //check for redundant spark context init |
| if( _spctx != null ) |
| return; |
| |
| long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; |
| |
| //create a default spark context (master, appname, etc refer to system properties |
| //as given in the spark configuration or during spark-submit) |
| |
| MLContext mlCtxObj = MLContextProxy.getActiveMLContext(); |
| if(mlCtxObj != null) |
| { |
| // This is when DML is called through spark shell |
| // Will clean the passing of static variables later as this involves minimal change to DMLScript |
| _spctx = MLContextUtil.getJavaSparkContext(mlCtxObj); |
| } |
| else |
| { |
| if(DMLScript.USE_LOCAL_SPARK_CONFIG) { |
| // For now set 4 cores for integration testing :) |
| SparkConf conf = createSystemDSSparkConf() |
| .setMaster("local[" + |
| ConfigurationManager.getDMLConfig().getTextValue(DMLConfig.LOCAL_SPARK_NUM_THREADS)+ |
| "]").setAppName("My local integration test app"); |
| // This is discouraged in spark but have added only for those testcase that cannot stop the context properly |
| // conf.set("spark.driver.allowMultipleContexts", "true"); |
| conf.set("spark.ui.enabled", "false"); |
| _spctx = new JavaSparkContext(conf); |
| } |
| else //default cluster setup |
| { |
| //setup systemds-preferred spark configuration (w/o user choice) |
| SparkConf conf = createSystemDSSparkConf(); |
| _spctx = new JavaSparkContext(conf); |
| } |
| |
| _parRDDs.clear(); |
| } |
| |
| // Set warning if spark.driver.maxResultSize is not set. It needs to be set before starting Spark Context for CP collect |
| String strDriverMaxResSize = _spctx.getConf().get("spark.driver.maxResultSize", "1g"); |
| long driverMaxResSize = UtilFunctions.parseMemorySize(strDriverMaxResSize); |
| if (driverMaxResSize != 0 && driverMaxResSize<OptimizerUtils.getLocalMemBudget() && !DMLScript.USE_LOCAL_SPARK_CONFIG) |
| LOG.warn("Configuration parameter spark.driver.maxResultSize set to " + UtilFunctions.formatMemorySize(driverMaxResSize) + "." |
| + " You can set it through Spark default configuration setting either to 0 (unlimited) or to available memory budget of size " |
| + UtilFunctions.formatMemorySize((long)OptimizerUtils.getLocalMemBudget()) + "."); |
| |
| //globally add binaryblock serialization framework for all hdfs read/write operations |
| //TODO if spark context passed in from outside (mlcontext), we need to clean this up at the end |
| if( HDFSTool.USE_BINARYBLOCK_SERIALIZATION ) |
| HDFSTool.addBinaryBlockSerializationFramework( _spctx.hadoopConfiguration() ); |
| |
| //statistics maintenance |
| if( DMLScript.STATISTICS ){ |
| Statistics.setSparkCtxCreateTime(System.nanoTime()-t0); |
| } |
| } |
| |
| /** |
| * Sets up a SystemDS-preferred Spark configuration based on the implicit |
| * default configuration (as passed via configurations from outside). |
| * |
| * @return spark configuration |
| */ |
| public static SparkConf createSystemDSSparkConf() { |
| SparkConf conf = new SparkConf(); |
| |
| //always set unlimited result size (required for cp collect) |
| conf.set("spark.driver.maxResultSize", "0"); |
| |
| //always use the fair scheduler (for single jobs, it's equivalent to fifo |
| //but for concurrent jobs in parfor it ensures better data locality because |
| //round robin assignment mitigates the problem of 'sticky slots') |
| if( FAIR_SCHEDULER_MODE ) { |
| conf.set("spark.scheduler.mode", "FAIR"); |
| } |
| |
| //increase scheduler delay (usually more robust due to better data locality) |
| if( !conf.contains("spark.locality.wait") ) { //default 3s |
| conf.set("spark.locality.wait", "5s"); |
| } |
| |
| //increase max message size for robustness |
| String sparkVersion = org.apache.spark.package$.MODULE$.SPARK_VERSION(); |
| String msgSizeConf = (UtilFunctions.compareVersion(sparkVersion, "2.0.0") < 0) ? |
| "spark.akka.frameSize" : "spark.rpc.message.maxSize"; |
| if( !conf.contains(msgSizeConf) ) { //default 128MB |
| conf.set(msgSizeConf, "512"); |
| } |
| |
| return conf; |
| } |
| |
| public static boolean isLocalMaster() { |
| return getSparkContextStatic().isLocal(); |
| } |
| |
| /** |
| * Spark instructions should call this for all matrix inputs except broadcast |
| * variables. |
| * |
| * @param varname variable name |
| * @return JavaPairRDD of MatrixIndexes-MatrixBlocks |
| */ |
| @SuppressWarnings("unchecked") |
| public JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryMatrixBlockRDDHandleForVariable(String varname ) { |
| MatrixObject mo = getMatrixObject(varname); |
| return (JavaPairRDD<MatrixIndexes,MatrixBlock>) |
| getRDDHandleForMatrixObject(mo, FileFormat.BINARY, -1, true); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public JavaPairRDD<MatrixIndexes,MatrixBlock> getBinaryMatrixBlockRDDHandleForVariable(String varname, int numParts, boolean inclEmpty ) { |
| MatrixObject mo = getMatrixObject(varname); |
| return (JavaPairRDD<MatrixIndexes,MatrixBlock>) |
| getRDDHandleForMatrixObject(mo, FileFormat.BINARY, numParts, inclEmpty); |
| } |
| |
| /** |
| * Spark instructions should call this for all tensor inputs except broadcast |
| * variables. |
| * |
| * @param varname variable name |
| * @return JavaPairRDD of TensorIndexes-HomogTensors |
| */ |
| @SuppressWarnings("unchecked") |
| public JavaPairRDD<TensorIndexes, TensorBlock> getBinaryTensorBlockRDDHandleForVariable(String varname ) { |
| TensorObject to = getTensorObject(varname); |
| return (JavaPairRDD<TensorIndexes, TensorBlock>) |
| getRDDHandleForTensorObject(to, FileFormat.BINARY, -1, true); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public JavaPairRDD<TensorIndexes, TensorBlock> getBinaryTensorBlockRDDHandleForVariable(String varname, int numParts, boolean inclEmpty ) { |
| TensorObject to = getTensorObject(varname); |
| return (JavaPairRDD<TensorIndexes, TensorBlock>) |
| getRDDHandleForTensorObject(to, FileFormat.BINARY, numParts, inclEmpty); |
| } |
| |
| /** |
| * Spark instructions should call this for all frame inputs except broadcast |
| * variables. |
| * |
| * @param varname variable name |
| * @return JavaPairRDD of Longs-FrameBlocks |
| */ |
| @SuppressWarnings("unchecked") |
| public JavaPairRDD<Long,FrameBlock> getFrameBinaryBlockRDDHandleForVariable( String varname ) { |
| FrameObject fo = getFrameObject(varname); |
| JavaPairRDD<Long,FrameBlock> out = (JavaPairRDD<Long,FrameBlock>) |
| getRDDHandleForFrameObject(fo, FileFormat.BINARY); |
| return out; |
| } |
| |
| public JavaPairRDD<?,?> getRDDHandleForVariable( String varname, FileFormat fmt, int numParts, boolean inclEmpty ) { |
| Data dat = getVariable(varname); |
| if( dat instanceof MatrixObject ) { |
| MatrixObject mo = getMatrixObject(varname); |
| return getRDDHandleForMatrixObject(mo, fmt, numParts, inclEmpty); |
| } |
| else if( dat instanceof FrameObject ) { |
| FrameObject fo = getFrameObject(varname); |
| return getRDDHandleForFrameObject(fo, fmt); |
| } |
| else { |
| throw new DMLRuntimeException("Failed to obtain RDD for data type other than matrix or frame."); |
| } |
| } |
| |
| public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, FileFormat fmt ) { |
| return getRDDHandleForMatrixObject(mo, fmt, -1, true); |
| } |
| |
| @SuppressWarnings({ "unchecked", "resource" }) |
| public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, FileFormat fmt, int numParts, boolean inclEmpty ) { |
| //NOTE: MB this logic should be integrated into MatrixObject |
| //However, for now we cannot assume that spark libraries are |
| //always available and hence only store generic references in |
| //matrix object while all the logic is in the SparkExecContext |
| |
| JavaSparkContext sc = getSparkContext(); |
| JavaPairRDD<?,?> rdd = null; |
| InputOutputInfo inputInfo = InputOutputInfo.get(DataType.MATRIX, fmt); |
| //CASE 1: rdd already existing (reuse if checkpoint or trigger |
| //pending rdd operations if not yet cached but prevent to re-evaluate |
| //rdd operations if already executed and cached |
| if( mo.getRDDHandle()!=null |
| && (mo.getRDDHandle().isCheckpointRDD() || !mo.isCached(false)) ) |
| { |
| //return existing rdd handling (w/o input format change) |
| rdd = mo.getRDDHandle().getRDD(); |
| } |
| //CASE 2: dirty in memory data or cached result of rdd operations |
| else if( mo.isDirty() || mo.isCached(false) || mo.isFederated() ) |
| { |
| //get in-memory matrix block and parallelize it |
| //w/ guarded parallelize (fallback to export, rdd from file if too large) |
| DataCharacteristics dc = mo.getDataCharacteristics(); |
| boolean fromFile = false; |
| if( !mo.isFederated() && (!OptimizerUtils.checkSparkCollectMemoryBudget(dc, 0) |
| || !_parRDDs.reserve(OptimizerUtils.estimatePartitionedSizeExactSparsity(dc)))) { |
| if( mo.isDirty() || !mo.isHDFSFileExists() ) //write if necessary |
| mo.exportData(); |
| rdd = sc.hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.keyClass, inputInfo.valueClass); |
| rdd = SparkUtils.copyBinaryBlockMatrix((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd); //cp is workaround for read bug |
| fromFile = true; |
| } |
| else { //default case |
| MatrixBlock mb = mo.acquireRead(); //pin matrix in memory |
| rdd = toMatrixJavaPairRDD(sc, mb, (int)mo.getBlocksize(), numParts, inclEmpty); |
| mo.release(); //unpin matrix |
| _parRDDs.registerRDD(rdd.id(), OptimizerUtils.estimatePartitionedSizeExactSparsity(dc), true); |
| } |
| |
| //keep rdd handle for future operations on it |
| RDDObject rddhandle = new RDDObject(rdd); |
| rddhandle.setHDFSFile(fromFile); |
| rddhandle.setParallelizedRDD(!fromFile); |
| mo.setRDDHandle(rddhandle); |
| } |
| //CASE 3: non-dirty (file exists on HDFS) |
| else |
| { |
| // parallelize hdfs-resident file |
| // For binary block, these are: SequenceFileInputFormat.class, MatrixIndexes.class, MatrixBlock.class |
| if(fmt == FileFormat.BINARY) { |
| rdd = sc.hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.keyClass, inputInfo.valueClass); |
| //note: this copy is still required in Spark 1.4 because spark hands out whatever the inputformat |
| //recordreader returns; the javadoc explicitly recommend to copy all key/value pairs |
| rdd = SparkUtils.copyBinaryBlockMatrix((JavaPairRDD<MatrixIndexes, MatrixBlock>)rdd); //cp is workaround for read bug |
| } |
| else if(fmt.isTextFormat()) { |
| rdd = sc.hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.keyClass, inputInfo.valueClass); |
| rdd = ((JavaPairRDD<LongWritable, Text>)rdd).mapToPair( new CopyTextInputFunction() ); //cp is workaround for read bug |
| } |
| else { |
| throw new DMLRuntimeException("Incorrect input format in getRDDHandleForVariable"); |
| } |
| |
| //keep rdd handle for future operations on it |
| RDDObject rddhandle = new RDDObject(rdd); |
| rddhandle.setHDFSFile(true); |
| mo.setRDDHandle(rddhandle); |
| } |
| |
| return rdd; |
| } |
| |
| @SuppressWarnings("resource") |
| public JavaPairRDD<?, ?> getRDDHandleForTensorObject(TensorObject to, FileFormat fmt, int numParts, boolean inclEmpty) { |
| //NOTE: MB this logic should be integrated into MatrixObject |
| //However, for now we cannot assume that spark libraries are |
| //always available and hence only store generic references in |
| //matrix object while all the logic is in the SparkExecContext |
| |
| JavaSparkContext sc = getSparkContext(); |
| JavaPairRDD<?, ?> rdd; |
| //CASE 1: rdd already existing (reuse if checkpoint or trigger |
| //pending rdd operations if not yet cached but prevent to re-evaluate |
| //rdd operations if already executed and cached |
| if (to.getRDDHandle() != null && (to.getRDDHandle().isCheckpointRDD() || !to.isCached(false))) { |
| //return existing rdd handling (w/o input format change) |
| rdd = to.getRDDHandle().getRDD(); |
| } |
| //CASE 2: dirty in memory data or cached result of rdd operations |
| else if (to.isDirty() || to.isCached(false)) { |
| //get in-memory matrix block and parallelize it |
| //w/ guarded parallelize (fallback to export, rdd from file if too large) |
| DataCharacteristics dc = to.getDataCharacteristics(); |
| //boolean fromFile = false; |
| if (!OptimizerUtils.checkSparkCollectMemoryBudget(dc, 0) || !_parRDDs.reserve( |
| OptimizerUtils.estimatePartitionedSizeExactSparsity(dc))) { |
| if (to.isDirty() || !to.isHDFSFileExists()) //write if necessary |
| to.exportData(); |
| // TODO implement hadoop read write for tensor |
| throw new DMLRuntimeException("Tensor can not yet be written or read to hadoopFile"); |
| /*rdd = sc.hadoopFile(to.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass); |
| rdd = SparkUtils.copyBinaryBlockTensor((JavaPairRDD<TensorIndexes, HomogTensor>) rdd); //cp is workaround for read bug |
| fromFile = true;*/ |
| } else { //default case |
| TensorBlock tb = to.acquireRead(); //pin matrix in memory |
| int blen = dc.getBlocksize(); |
| rdd = toTensorJavaPairRDD(sc, tb, blen, numParts, inclEmpty); |
| to.release(); //unpin matrix |
| _parRDDs.registerRDD(rdd.id(), OptimizerUtils.estimatePartitionedSizeExactSparsity(dc), true); |
| } |
| |
| //keep rdd handle for future operations on it |
| RDDObject rddhandle = new RDDObject(rdd); |
| // TODO use fromFile instead of false and true |
| rddhandle.setHDFSFile(false); |
| rddhandle.setParallelizedRDD(true); |
| to.setRDDHandle(rddhandle); |
| } |
| //CASE 3: non-dirty (file exists on HDFS) |
| else { |
| // parallelize hdfs-resident file |
| // For binary block, these are: SequenceFileInputFormat.class, MatrixIndexes.class, MatrixBlock.class |
| if (fmt == FileFormat.BINARY) { |
| // TODO implement hadoop read write for tensor |
| throw new DMLRuntimeException("Tensor can not yet be written or read to hadoopFile"); |
| //rdd = sc.hadoopFile(to.getFileName(), inputInfo.inputFormatClass, inputInfo.inputKeyClass, inputInfo.inputValueClass); |
| //note: this copy is still required in Spark 1.4 because spark hands out whatever the inputformat |
| //recordreader returns; the javadoc explicitly recommend to copy all key/value pairs |
| //rdd = SparkUtils.copyBinaryBlockTensor((JavaPairRDD<TensorIndexes, HomogTensor>) rdd); //cp is workaround for read bug |
| // TODO: TensorMarket? |
| } else { |
| // TODO support other Input formats |
| throw new DMLRuntimeException("Incorrect input format in getRDDHandleForVariable"); |
| } |
| |
| //keep rdd handle for future operations on it |
| //RDDObject rddhandle = new RDDObject(rdd); |
| //rddhandle.setHDFSFile(true); |
| //to.setRDDHandle(rddhandle); |
| } |
| |
| return rdd; |
| } |
| |
| /** |
| * FIXME: currently this implementation assumes matrix representations but frame signature |
| * in order to support the old transform implementation. |
| * |
| * @param fo frame object |
| * @param fmt file format type |
| * @return JavaPairRDD handle for a frame object |
| */ |
| @SuppressWarnings({ "unchecked", "resource" }) |
| public JavaPairRDD<?,?> getRDDHandleForFrameObject( FrameObject fo, FileFormat fmt ) |
| { |
| //NOTE: MB this logic should be integrated into FrameObject |
| //However, for now we cannot assume that spark libraries are |
| //always available and hence only store generic references in |
| //matrix object while all the logic is in the SparkExecContext |
| |
| InputOutputInfo inputInfo2 = InputOutputInfo.get(DataType.FRAME, fmt); |
| JavaSparkContext sc = getSparkContext(); |
| JavaPairRDD<?,?> rdd = null; |
| //CASE 1: rdd already existing (reuse if checkpoint or trigger |
| //pending rdd operations if not yet cached but prevent to re-evaluate |
| //rdd operations if already executed and cached |
| if( fo.getRDDHandle()!=null |
| && (fo.getRDDHandle().isCheckpointRDD() || !fo.isCached(false)) ) |
| { |
| //return existing rdd handling (w/o input format change) |
| rdd = fo.getRDDHandle().getRDD(); |
| } |
| //CASE 2: dirty in memory data or cached result of rdd operations |
| else if( fo.isDirty() || fo.isCached(false) ) |
| { |
| //get in-memory matrix block and parallelize it |
| //w/ guarded parallelize (fallback to export, rdd from file if too large) |
| DataCharacteristics dc = fo.getDataCharacteristics(); |
| boolean fromFile = false; |
| if( !OptimizerUtils.checkSparkCollectMemoryBudget(dc, 0) || !_parRDDs.reserve( |
| OptimizerUtils.estimatePartitionedSizeExactSparsity(dc)) ) { |
| if( fo.isDirty() ) { //write only if necessary |
| fo.exportData(); |
| } |
| rdd = sc.hadoopFile( fo.getFileName(), inputInfo2.inputFormatClass, inputInfo2.keyClass, inputInfo2.valueClass); |
| rdd = ((JavaPairRDD<LongWritable, FrameBlock>)rdd).mapToPair( new CopyFrameBlockPairFunction() ); //cp is workaround for read bug |
| fromFile = true; |
| } |
| else { //default case |
| FrameBlock fb = fo.acquireRead(); //pin frame in memory |
| rdd = toFrameJavaPairRDD(sc, fb); |
| fo.release(); //unpin frame |
| _parRDDs.registerRDD(rdd.id(), OptimizerUtils.estimatePartitionedSizeExactSparsity(dc), true); |
| } |
| |
| //keep rdd handle for future operations on it |
| RDDObject rddhandle = new RDDObject(rdd); |
| rddhandle.setHDFSFile(fromFile); |
| fo.setRDDHandle(rddhandle); |
| } |
| //CASE 3: non-dirty (file exists on HDFS) |
| else |
| { |
| // parallelize hdfs-resident file |
| // For binary block, these are: SequenceFileInputFormat.class, MatrixIndexes.class, MatrixBlock.class |
| if(fmt == FileFormat.BINARY) { |
| rdd = sc.hadoopFile( fo.getFileName(), inputInfo2.inputFormatClass, inputInfo2.keyClass, inputInfo2.valueClass); |
| //note: this copy is still required in Spark 1.4 because spark hands out whatever the inputformat |
| //recordreader returns; the javadoc explicitly recommend to copy all key/value pairs |
| rdd = ((JavaPairRDD<LongWritable, FrameBlock>)rdd).mapToPair( new CopyFrameBlockPairFunction() ); //cp is workaround for read bug |
| } |
| else if(fmt.isTextFormat()) { |
| rdd = sc.hadoopFile( fo.getFileName(), inputInfo2.inputFormatClass, inputInfo2.keyClass, inputInfo2.valueClass); |
| rdd = ((JavaPairRDD<LongWritable, Text>)rdd).mapToPair( new CopyTextInputFunction() ); //cp is workaround for read bug |
| } |
| else { |
| throw new DMLRuntimeException("Incorrect input format in getRDDHandleForVariable"); |
| } |
| |
| //keep rdd handle for future operations on it |
| RDDObject rddhandle = new RDDObject(rdd); |
| rddhandle.setHDFSFile(true); |
| fo.setRDDHandle(rddhandle); |
| } |
| |
| return rdd; |
| } |
| |
| public Broadcast<CacheBlock> broadcastVariable(CacheableData<CacheBlock> cd) { |
| long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; |
| Broadcast<CacheBlock> brBlock = null; |
| |
| // reuse existing non partitioned broadcast handle |
| if (cd.getBroadcastHandle() != null && cd.getBroadcastHandle().isNonPartitionedBroadcastValid()) { |
| brBlock = cd.getBroadcastHandle().getNonPartitionedBroadcast(); |
| } |
| |
| if (brBlock == null) { |
| //create new broadcast handle (never created, evicted) |
| // account for overwritten invalid broadcast (e.g., evicted) |
| if (cd.getBroadcastHandle() != null) |
| CacheableData.addBroadcastSize(-cd.getBroadcastHandle().getNonPartitionedBroadcastSize()); |
| |
| // read the matrix block |
| CacheBlock cb = cd.acquireRead(); |
| cd.release(); |
| |
| // broadcast a non-empty frame whose size is smaller than 2G |
| if (cb.getExactSerializedSize() > 0 && cb.getExactSerializedSize() <= Integer.MAX_VALUE) { |
| brBlock = getSparkContext().broadcast(cb); |
| // create the broadcast handle if the matrix or frame has never been broadcasted |
| if (cd.getBroadcastHandle() == null) { |
| cd.setBroadcastHandle(new BroadcastObject<>()); |
| } |
| cd.getBroadcastHandle().setNonPartitionedBroadcast(brBlock, |
| OptimizerUtils.estimateSize(cd.getDataCharacteristics())); |
| CacheableData.addBroadcastSize(cd.getBroadcastHandle().getNonPartitionedBroadcastSize()); |
| |
| if (DMLScript.STATISTICS) { |
| Statistics.accSparkBroadCastTime(System.nanoTime() - t0); |
| Statistics.incSparkBroadcastCount(1); |
| } |
| } |
| } |
| return brBlock; |
| } |
| |
| @SuppressWarnings("unchecked") |
| public PartitionedBroadcast<MatrixBlock> getBroadcastForMatrixObject(MatrixObject mo) { |
| //NOTE: The memory consumption of this method is the in-memory size of the |
| //matrix object plus the partitioned size in 1k-1k blocks. Since the call |
| //to broadcast happens after the matrix object has been released, the memory |
| //requirements of blockified chunks in Spark's block manager are covered under |
| //this maximum. Also note that we explicitly clear the in-memory blocks once |
| //the broadcasts are created (other than in local mode) in order to avoid |
| //unnecessary memory requirements during the lifetime of this broadcast handle. |
| |
| long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; |
| |
| PartitionedBroadcast<MatrixBlock> bret = null; |
| |
| //reuse existing broadcast handle |
| if (mo.getBroadcastHandle() != null && mo.getBroadcastHandle().isPartitionedBroadcastValid()) { |
| bret = mo.getBroadcastHandle().getPartitionedBroadcast(); |
| } |
| |
| //create new broadcast handle (never created, evicted) |
| if (bret == null) { |
| //account for overwritten invalid broadcast (e.g., evicted) |
| if (mo.getBroadcastHandle() != null) |
| CacheableData.addBroadcastSize(-mo.getBroadcastHandle().getPartitionedBroadcastSize()); |
| |
| //obtain meta data for matrix |
| int blen = (int) mo.getBlocksize(); |
| |
| //create partitioned matrix block and release memory consumed by input |
| MatrixBlock mb = mo.acquireRead(); |
| PartitionedBlock<MatrixBlock> pmb = new PartitionedBlock<>(mb, blen); |
| mo.release(); |
| |
| //determine coarse-grained partitioning |
| int numPerPart = PartitionedBroadcast.computeBlocksPerPartition(mo.getNumRows(), mo.getNumColumns(), blen); |
| int numParts = (int) Math.ceil((double) pmb.getNumRowBlocks() * pmb.getNumColumnBlocks() / numPerPart); |
| Broadcast<PartitionedBlock<MatrixBlock>>[] ret = new Broadcast[numParts]; |
| |
| //create coarse-grained partitioned broadcasts |
| if (numParts > 1) { |
| Arrays.parallelSetAll(ret, i -> createPartitionedBroadcast(pmb, numPerPart, i)); |
| } else { //single partition |
| ret[0] = getSparkContext().broadcast(pmb); |
| if (!isLocalMaster()) |
| pmb.clearBlocks(); |
| } |
| |
| bret = new PartitionedBroadcast<>(ret, mo.getDataCharacteristics()); |
| // create the broadcast handle if the matrix or frame has never been broadcasted |
| if (mo.getBroadcastHandle() == null) { |
| mo.setBroadcastHandle(new BroadcastObject<MatrixBlock>()); |
| } |
| mo.getBroadcastHandle().setPartitionedBroadcast(bret, |
| OptimizerUtils.estimatePartitionedSizeExactSparsity(mo.getDataCharacteristics())); |
| CacheableData.addBroadcastSize(mo.getBroadcastHandle().getPartitionedBroadcastSize()); |
| } |
| |
| if (DMLScript.STATISTICS) { |
| Statistics.accSparkBroadCastTime(System.nanoTime() - t0); |
| Statistics.incSparkBroadcastCount(1); |
| } |
| |
| return bret; |
| } |
| |
| @SuppressWarnings("unchecked") |
| public PartitionedBroadcast<TensorBlock> getBroadcastForTensorObject(TensorObject to) { |
| long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; |
| |
| PartitionedBroadcast<TensorBlock> bret = null; |
| |
| //reuse existing broadcast handle |
| if (to.getBroadcastHandle() != null && to.getBroadcastHandle().isPartitionedBroadcastValid()) { |
| bret = to.getBroadcastHandle().getPartitionedBroadcast(); |
| } |
| |
| //create new broadcast handle (never created, evicted) |
| if (bret == null) { |
| //account for overwritten invalid broadcast (e.g., evicted) |
| if (to.getBroadcastHandle() != null) |
| CacheableData.addBroadcastSize(-to.getBroadcastHandle().getPartitionedBroadcastSize()); |
| |
| //obtain meta data for matrix |
| DataCharacteristics dc = to.getDataCharacteristics(); |
| long[] dims = dc.getDims(); |
| int blen = dc.getBlocksize(); |
| |
| //create partitioned matrix block and release memory consumed by input |
| PartitionedBlock<TensorBlock> pmb = new PartitionedBlock<>(to.acquireReadAndRelease(), dims, blen); |
| |
| //determine coarse-grained partitioning |
| int numPerPart = PartitionedBroadcast.computeBlocksPerPartition(dims, blen); |
| int numParts = (int) Math.ceil((double) pmb.getNumRowBlocks() * pmb.getNumColumnBlocks() / numPerPart); |
| Broadcast<PartitionedBlock<TensorBlock>>[] ret = new Broadcast[numParts]; |
| |
| //create coarse-grained partitioned broadcasts |
| if (numParts > 1) { |
| Arrays.parallelSetAll(ret, i -> createPartitionedBroadcast(pmb, numPerPart, i)); |
| } else { //single partition |
| ret[0] = getSparkContext().broadcast(pmb); |
| if (!isLocalMaster()) |
| pmb.clearBlocks(); |
| } |
| |
| bret = new PartitionedBroadcast<>(ret, to.getDataCharacteristics()); |
| // create the broadcast handle if the matrix or frame has never been broadcasted |
| if (to.getBroadcastHandle() == null) { |
| to.setBroadcastHandle(new BroadcastObject<MatrixBlock>()); |
| } |
| to.getBroadcastHandle().setPartitionedBroadcast(bret, |
| OptimizerUtils.estimatePartitionedSizeExactSparsity(to.getDataCharacteristics())); |
| CacheableData.addBroadcastSize(to.getBroadcastHandle().getPartitionedBroadcastSize()); |
| } |
| |
| if (DMLScript.STATISTICS) { |
| Statistics.accSparkBroadCastTime(System.nanoTime() - t0); |
| Statistics.incSparkBroadcastCount(1); |
| } |
| |
| return bret; |
| } |
| |
| public PartitionedBroadcast<MatrixBlock> getBroadcastForVariable(String varname) { |
| return getBroadcastForMatrixObject(getMatrixObject(varname)); |
| } |
| |
| public PartitionedBroadcast<TensorBlock> getBroadcastForTensorVariable(String varname) { |
| return getBroadcastForTensorObject(getTensorObject(varname)); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public PartitionedBroadcast<FrameBlock> getBroadcastForFrameVariable(String varname) { |
| long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; |
| |
| FrameObject fo = getFrameObject(varname); |
| |
| PartitionedBroadcast<FrameBlock> bret = null; |
| |
| //reuse existing broadcast handle |
| if (fo.getBroadcastHandle() != null && fo.getBroadcastHandle().isPartitionedBroadcastValid()) { |
| bret = fo.getBroadcastHandle().getPartitionedBroadcast(); |
| } |
| |
| //create new broadcast handle (never created, evicted) |
| if (bret == null) { |
| //account for overwritten invalid broadcast (e.g., evicted) |
| if (fo.getBroadcastHandle() != null) |
| CacheableData.addBroadcastSize(-fo.getBroadcastHandle().getPartitionedBroadcastSize()); |
| |
| //obtain meta data for frame |
| int blen = OptimizerUtils.getDefaultFrameSize(); |
| |
| //create partitioned frame block and release memory consumed by input |
| FrameBlock mb = fo.acquireRead(); |
| PartitionedBlock<FrameBlock> pmb = new PartitionedBlock<>(mb, blen); |
| fo.release(); |
| |
| //determine coarse-grained partitioning |
| int numPerPart = PartitionedBroadcast.computeBlocksPerPartition(fo.getNumRows(), fo.getNumColumns(), blen); |
| int numParts = (int) Math.ceil((double) pmb.getNumRowBlocks() * pmb.getNumColumnBlocks() / numPerPart); |
| Broadcast<PartitionedBlock<FrameBlock>>[] ret = new Broadcast[numParts]; |
| |
| //create coarse-grained partitioned broadcasts |
| if (numParts > 1) { |
| Arrays.parallelSetAll(ret, i -> createPartitionedBroadcast(pmb, numPerPart, i)); |
| } else { //single partition |
| ret[0] = getSparkContext().broadcast(pmb); |
| if (!isLocalMaster()) |
| pmb.clearBlocks(); |
| } |
| |
| bret = new PartitionedBroadcast<>(ret, new MatrixCharacteristics( |
| fo.getDataCharacteristics()).setBlocksize(blen)); |
| if (fo.getBroadcastHandle() == null) |
| fo.setBroadcastHandle(new BroadcastObject<FrameBlock>()); |
| |
| fo.getBroadcastHandle().setPartitionedBroadcast(bret, |
| OptimizerUtils.estimatePartitionedSizeExactSparsity(fo.getDataCharacteristics())); |
| CacheableData.addBroadcastSize(fo.getBroadcastHandle().getPartitionedBroadcastSize()); |
| } |
| |
| if (DMLScript.STATISTICS) { |
| Statistics.accSparkBroadCastTime(System.nanoTime() - t0); |
| Statistics.incSparkBroadcastCount(1); |
| } |
| |
| return bret; |
| } |
| |
| private Broadcast<PartitionedBlock<? extends CacheBlock>> createPartitionedBroadcast( |
| PartitionedBlock<? extends CacheBlock> pmb, int numPerPart, int pos) { |
| int offset = pos * numPerPart; |
| int numBlks = Math.min(numPerPart, pmb.getNumRowBlocks() * pmb.getNumColumnBlocks() - offset); |
| PartitionedBlock<? extends CacheBlock> tmp = pmb.createPartition(offset, numBlks); |
| Broadcast<PartitionedBlock<? extends CacheBlock>> ret = getSparkContext().broadcast(tmp); |
| if (!isLocalMaster()) |
| tmp.clearBlocks(); |
| return ret; |
| } |
| |
| /** |
| * Keep the output rdd of spark rdd operations as meta data of matrix/frame |
| * objects in the symbol table. |
| * |
| * @param varname variable name |
| * @param rdd JavaPairRDD handle for variable |
| */ |
| public void setRDDHandleForVariable(String varname, JavaPairRDD<?,?> rdd) { |
| CacheableData<?> obj = getCacheableData(varname); |
| RDDObject rddhandle = new RDDObject(rdd); |
| obj.setRDDHandle( rddhandle ); |
| } |
| |
| public static JavaPairRDD<MatrixIndexes,MatrixBlock> toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, int blen) { |
| return toMatrixJavaPairRDD(sc, src, blen, -1, true); |
| } |
| |
| public static JavaPairRDD<MatrixIndexes,MatrixBlock> toMatrixJavaPairRDD(JavaSparkContext sc, MatrixBlock src, |
| int blen, int numParts, boolean inclEmpty) { |
| long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; |
| List<Tuple2<MatrixIndexes,MatrixBlock>> list = null; |
| |
| if( src.getNumRows() <= blen && src.getNumColumns() <= blen ) { |
| list = Arrays.asList(new Tuple2<>(new MatrixIndexes(1,1), src)); |
| } |
| else { |
| MatrixCharacteristics mc = new MatrixCharacteristics( |
| src.getNumRows(), src.getNumColumns(), blen, src.getNonZeros()); |
| list = LongStream.range(0, mc.getNumBlocks()).parallel() |
| .mapToObj(i -> createIndexedMatrixBlock(src, mc, i)) |
| .filter(kv -> inclEmpty || !kv._2.isEmptyBlock(false)) |
| .collect(Collectors.toList()); |
| } |
| |
| JavaPairRDD<MatrixIndexes,MatrixBlock> result = (numParts > 1) ? |
| sc.parallelizePairs(list, numParts) : sc.parallelizePairs(list); |
| |
| if (DMLScript.STATISTICS) { |
| Statistics.accSparkParallelizeTime(System.nanoTime() - t0); |
| Statistics.incSparkParallelizeCount(1); |
| } |
| |
| return result; |
| } |
| |
| public static JavaPairRDD<TensorIndexes, TensorBlock> toTensorJavaPairRDD(JavaSparkContext sc, TensorBlock src, int blen) { |
| return toTensorJavaPairRDD(sc, src, blen, -1, true); |
| } |
| |
| public static JavaPairRDD<TensorIndexes, TensorBlock> toTensorJavaPairRDD(JavaSparkContext sc, TensorBlock src, |
| int blen, int numParts, boolean inclEmpty) { |
| long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; |
| List<Tuple2<TensorIndexes, TensorBlock>> list; |
| |
| int numDims = src.getNumDims(); |
| boolean singleBlock = true; |
| for (int i = 0; i < numDims; i++) { |
| if (blen > src.getDim(i)) { |
| singleBlock = false; |
| break; |
| } |
| } |
| if (singleBlock) { |
| long[] ix = new long[numDims]; |
| Arrays.fill(ix, 1); |
| list = Arrays.asList(new Tuple2<>(new TensorIndexes(ix), src)); |
| } else { |
| // TODO rows and columns for matrix characteristics |
| long[] dims = src.getLongDims(); |
| TensorCharacteristics mc = new TensorCharacteristics(dims, src.getNonZeros()); |
| list = LongStream.range(0, mc.getNumBlocks()).parallel() |
| .mapToObj(i -> createIndexedTensorBlock(src, mc, i)) |
| .filter(kv -> inclEmpty || !kv._2.isEmpty(false)) |
| .collect(Collectors.toList()); |
| } |
| |
| JavaPairRDD<TensorIndexes, TensorBlock> result = (numParts > 1) ? |
| sc.parallelizePairs(list, numParts) : sc.parallelizePairs(list); |
| |
| if (DMLScript.STATISTICS) { |
| Statistics.accSparkParallelizeTime(System.nanoTime() - t0); |
| Statistics.incSparkParallelizeCount(1); |
| } |
| |
| return result; |
| } |
| |
| private static Tuple2<MatrixIndexes,MatrixBlock> createIndexedMatrixBlock(MatrixBlock mb, MatrixCharacteristics mc, long ix) { |
| try { |
| //compute block indexes |
| long blockRow = ix / mc.getNumColBlocks(); |
| long blockCol = ix % mc.getNumColBlocks(); |
| //compute block sizes |
| int maxRow = UtilFunctions.computeBlockSize(mc.getRows(), blockRow+1, mc.getBlocksize()); |
| int maxCol = UtilFunctions.computeBlockSize(mc.getCols(), blockCol+1, mc.getBlocksize()); |
| //copy sub-matrix to block |
| MatrixBlock block = new MatrixBlock(maxRow, maxCol, mb.isInSparseFormat()); |
| int row_offset = (int)blockRow*mc.getBlocksize(); |
| int col_offset = (int)blockCol*mc.getBlocksize(); |
| block = mb.slice( row_offset, row_offset+maxRow-1, |
| col_offset, col_offset+maxCol-1, block ); |
| //create key-value pair |
| return new Tuple2<>(new MatrixIndexes(blockRow+1, blockCol+1), block); |
| } |
| catch(DMLRuntimeException ex) { |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| private static Tuple2<TensorIndexes, TensorBlock> createIndexedTensorBlock(TensorBlock mb, TensorCharacteristics tc, long ix) { |
| try { |
| //compute block indexes |
| long[] blockIx = UtilFunctions.computeTensorIndexes(tc, ix); |
| int[] outDims = new int[tc.getNumDims()]; |
| int[] offset = new int[tc.getNumDims()]; |
| UtilFunctions.computeSliceInfo(tc, blockIx, outDims, offset); |
| TensorBlock outBlock = new TensorBlock(mb.getValueType(), outDims); |
| outBlock = mb.slice(offset, outBlock); |
| return new Tuple2<>(new TensorIndexes(blockIx), outBlock); |
| } |
| catch(DMLRuntimeException ex) { |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| public static JavaPairRDD<Long,FrameBlock> toFrameJavaPairRDD(JavaSparkContext sc, FrameBlock src) { |
| long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; |
| LinkedList<Tuple2<Long,FrameBlock>> list = new LinkedList<>(); |
| |
| //create and write subblocks of matrix |
| int blksize = ConfigurationManager.getBlocksize(); |
| for(int blockRow = 0; blockRow < (int)Math.ceil(src.getNumRows()/(double)blksize); blockRow++) |
| { |
| int maxRow = (blockRow*blksize + blksize < src.getNumRows()) ? blksize : src.getNumRows() - blockRow*blksize; |
| int roffset = blockRow*blksize; |
| |
| FrameBlock block = new FrameBlock(src.getSchema()); |
| |
| //copy sub frame to block, incl meta data on first |
| src.slice( roffset, roffset+maxRow-1, 0, src.getNumColumns()-1, block ); |
| if( roffset == 0 ) |
| block.setColumnMetadata(src.getColumnMetadata()); |
| |
| //append block to sequence file |
| list.addLast(new Tuple2<>((long)roffset+1, block)); |
| } |
| |
| JavaPairRDD<Long,FrameBlock> result = sc.parallelizePairs(list); |
| if (DMLScript.STATISTICS) { |
| Statistics.accSparkParallelizeTime(System.nanoTime() - t0); |
| Statistics.incSparkParallelizeCount(1); |
| } |
| |
| return result; |
| } |
| |
| /** |
| * This method is a generic abstraction for calls from the buffer pool. |
| * |
| * @param rdd rdd object |
| * @param rlen number of rows |
| * @param clen number of columns |
| * @param blen block length |
| * @param nnz number of non-zeros |
| * @return matrix block |
| */ |
| @SuppressWarnings("unchecked") |
| public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, int blen, long nnz) { |
| return toMatrixBlock( |
| (JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD(), |
| rlen, clen, blen, nnz); |
| } |
| |
| /** |
| * Utility method for creating a single matrix block out of a binary block RDD. |
| * Note that this collect call might trigger execution of any pending transformations. |
| * |
| * NOTE: This is an unguarded utility function, which requires memory for both the output matrix |
| * and its collected, blocked representation. |
| * |
| * @param rdd JavaPairRDD for matrix block |
| * @param rlen number of rows |
| * @param clen number of columns |
| * @param blen block length |
| * @param nnz number of non-zeros |
| * @return matrix block |
| */ |
| public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int blen, long nnz) { |
| long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; |
| |
| MatrixBlock out = null; |
| |
| if( rlen <= blen && clen <= blen ) //SINGLE BLOCK |
| { |
| //special case without copy and nnz maintenance |
| List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect(); |
| |
| if( list.size()>1 ) |
| throw new DMLRuntimeException("Expecting no more than one result block."); |
| else if( list.size()==1 ) |
| out = list.get(0)._2(); |
| else //empty (e.g., after ops w/ outputEmpty=false) |
| out = new MatrixBlock(rlen, clen, true); |
| out.examSparsity(); |
| } |
| else //MULTIPLE BLOCKS |
| { |
| //determine target sparse/dense representation |
| long lnnz = (nnz >= 0) ? nnz : (long)rlen * clen; |
| boolean sparse = MatrixBlock.evalSparseFormatInMemory(rlen, clen, lnnz); |
| |
| //create output matrix block (w/ lazy allocation) |
| out = new MatrixBlock(rlen, clen, sparse, lnnz); |
| |
| //kickoff asynchronous allocation |
| Future<MatrixBlock> fout = out.allocateBlockAsync(); |
| |
| //trigger pending RDD operations and collect blocks |
| List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect(); |
| out = IOUtilFunctions.get(fout); //wait for allocation |
| |
| //copy blocks one-at-a-time into output matrix block |
| long aNnz = 0; |
| for( Tuple2<MatrixIndexes,MatrixBlock> keyval : list ) |
| { |
| //unpack index-block pair |
| MatrixIndexes ix = keyval._1(); |
| MatrixBlock block = keyval._2(); |
| |
| //compute row/column block offsets |
| int row_offset = (int)(ix.getRowIndex()-1)*blen; |
| int col_offset = (int)(ix.getColumnIndex()-1)*blen; |
| int rows = block.getNumRows(); |
| int cols = block.getNumColumns(); |
| |
| //handle compressed blocks (decompress for robustness) |
| if( block instanceof CompressedMatrixBlock ) |
| block = ((CompressedMatrixBlock)block).decompress(); |
| |
| //append block |
| if( sparse ) { //SPARSE OUTPUT |
| //append block to sparse target in order to avoid shifting, where |
| //we use a shallow row copy in case of MCSR and single column blocks |
| //note: this append requires, for multiple column blocks, a final sort |
| out.appendToSparse(block, row_offset, col_offset, clen>blen); |
| } |
| else { //DENSE OUTPUT |
| out.copy( row_offset, row_offset+rows-1, |
| col_offset, col_offset+cols-1, block, false ); |
| } |
| |
| //incremental maintenance nnz |
| aNnz += block.getNonZeros(); |
| } |
| |
| //post-processing output matrix |
| if( sparse && clen>blen ) |
| out.sortSparseRows(); |
| out.setNonZeros(aNnz); |
| out.examSparsity(); |
| } |
| |
| if (DMLScript.STATISTICS) { |
| Statistics.accSparkCollectTime(System.nanoTime() - t0); |
| Statistics.incSparkCollectCount(1); |
| } |
| |
| return out; |
| } |
| |
| @SuppressWarnings("unchecked") |
| public static MatrixBlock toMatrixBlock(RDDObject rdd, int rlen, int clen, long nnz) { |
| return toMatrixBlock( |
| (JavaPairRDD<MatrixIndexes, MatrixCell>) rdd.getRDD(), |
| rlen, clen, nnz); |
| } |
| |
| /** |
| * Utility method for creating a single matrix block out of a binary cell RDD. |
| * Note that this collect call might trigger execution of any pending transformations. |
| * |
| * @param rdd JavaPairRDD for matrix block |
| * @param rlen number of rows |
| * @param clen number of columns |
| * @param nnz number of non-zeros |
| * @return matrix block |
| */ |
| public static MatrixBlock toMatrixBlock(JavaPairRDD<MatrixIndexes, MatrixCell> rdd, int rlen, int clen, long nnz) |
| { |
| long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; |
| |
| MatrixBlock out = null; |
| |
| //determine target sparse/dense representation |
| long lnnz = (nnz >= 0) ? nnz : (long)rlen * clen; |
| boolean sparse = MatrixBlock.evalSparseFormatInMemory(rlen, clen, lnnz); |
| |
| //create output matrix block (w/ lazy allocation) |
| out = new MatrixBlock(rlen, clen, sparse); |
| |
| List<Tuple2<MatrixIndexes,MatrixCell>> list = rdd.collect(); |
| |
| //copy blocks one-at-a-time into output matrix block |
| for( Tuple2<MatrixIndexes,MatrixCell> keyval : list ) |
| { |
| //unpack index-block pair |
| MatrixIndexes ix = keyval._1(); |
| MatrixCell cell = keyval._2(); |
| |
| //append cell to dense/sparse target in order to avoid shifting for sparse |
| //note: this append requires a final sort of sparse rows |
| out.appendValue((int)ix.getRowIndex()-1, (int)ix.getColumnIndex()-1, cell.getValue()); |
| } |
| |
| //post-processing output matrix |
| if( sparse ) |
| out.sortSparseRows(); |
| out.recomputeNonZeros(); |
| out.examSparsity(); |
| |
| if (DMLScript.STATISTICS) { |
| Statistics.accSparkCollectTime(System.nanoTime() - t0); |
| Statistics.incSparkCollectCount(1); |
| } |
| |
| return out; |
| } |
| |
| public static TensorBlock toTensorBlock(JavaPairRDD<TensorIndexes, TensorBlock> rdd, DataCharacteristics dc) { |
| long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; |
| |
| // TODO special case single block |
| int[] idims = dc.getIntDims(); |
| // TODO asynchronous allocation |
| List<Tuple2<TensorIndexes, TensorBlock>> list = rdd.collect(); |
| ValueType vt = (list.get(0)._2).getValueType(); |
| TensorBlock out = new TensorBlock(vt, idims).allocateBlock(); |
| |
| //copy blocks one-at-a-time into output matrix block |
| for( Tuple2<TensorIndexes, TensorBlock> keyval : list ) |
| { |
| //unpack index-block pair |
| TensorIndexes ix = keyval._1(); |
| TensorBlock block = keyval._2(); |
| |
| //compute row/column block offsets |
| int[] lower = new int[ix.getNumDims()]; |
| int[] upper = new int[ix.getNumDims()]; |
| for (int i = 0; i < lower.length; i++) { |
| lower[i] = (int) ((ix.getIndex(i) - 1) * dc.getBlocksize()); |
| upper[i] = lower[i] + block.getDim(i) - 1; |
| } |
| upper[upper.length - 1]++; |
| for (int i = upper.length - 1; i > 0; i--) { |
| if (upper[i] == block.getDim(i)) { |
| upper[i] = 0; |
| upper[i - 1]++; |
| } |
| } |
| |
| // TODO sparse copy |
| out.copy(lower, upper, block); |
| // TODO keep track of nnz |
| } |
| |
| // TODO post-processing output tensor (nnz, sparsity) |
| |
| if (DMLScript.STATISTICS) { |
| Statistics.accSparkCollectTime(System.nanoTime() - t0); |
| Statistics.incSparkCollectCount(1); |
| } |
| return out; |
| } |
| |
| public static PartitionedBlock<MatrixBlock> toPartitionedMatrixBlock(JavaPairRDD<MatrixIndexes,MatrixBlock> rdd, int rlen, int clen, int blen, long nnz) |
| { |
| long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; |
| |
| PartitionedBlock<MatrixBlock> out = new PartitionedBlock<>(rlen, clen, blen); |
| List<Tuple2<MatrixIndexes,MatrixBlock>> list = rdd.collect(); |
| |
| //copy blocks one-at-a-time into output matrix block |
| for( Tuple2<MatrixIndexes,MatrixBlock> keyval : list ) { |
| //unpack index-block pair |
| MatrixIndexes ix = keyval._1(); |
| MatrixBlock block = keyval._2(); |
| out.setBlock((int)ix.getRowIndex(), (int)ix.getColumnIndex(), block); |
| } |
| |
| if (DMLScript.STATISTICS) { |
| Statistics.accSparkCollectTime(System.nanoTime() - t0); |
| Statistics.incSparkCollectCount(1); |
| } |
| |
| return out; |
| } |
| |
| @SuppressWarnings("unchecked") |
| public static FrameBlock toFrameBlock(RDDObject rdd, ValueType[] schema, int rlen, int clen) { |
| JavaPairRDD<Long,FrameBlock> lrdd = (JavaPairRDD<Long,FrameBlock>) rdd.getRDD(); |
| return toFrameBlock(lrdd, schema, rlen, clen); |
| } |
| |
| public static FrameBlock toFrameBlock(JavaPairRDD<Long,FrameBlock> rdd, ValueType[] schema, int rlen, int clen) { |
| long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0; |
| |
| if(schema == null) |
| schema = UtilFunctions.nCopies(clen, ValueType.STRING); |
| |
| //create output frame block (w/ lazy allocation) |
| FrameBlock out = new FrameBlock(schema); |
| out.ensureAllocatedColumns(rlen); |
| |
| List<Tuple2<Long,FrameBlock>> list = rdd.collect(); |
| |
| //copy blocks one-at-a-time into output matrix block |
| for( Tuple2<Long,FrameBlock> keyval : list ) |
| { |
| //unpack index-block pair |
| int ix = (int)(keyval._1() - 1); |
| FrameBlock block = keyval._2(); |
| |
| //copy into output frame |
| out.copy( ix, ix+block.getNumRows()-1, 0, block.getNumColumns()-1, block ); |
| if( ix == 0 ) { |
| out.setColumnNames(block.getColumnNames()); |
| out.setColumnMetadata(block.getColumnMetadata()); |
| } |
| } |
| |
| if (DMLScript.STATISTICS) { |
| Statistics.accSparkCollectTime(System.nanoTime() - t0); |
| Statistics.incSparkCollectCount(1); |
| } |
| |
| return out; |
| } |
| |
| @SuppressWarnings("unchecked") |
| public static long writeMatrixRDDtoHDFS( RDDObject rdd, String path, FileFormat fmt ) |
| { |
| JavaPairRDD<MatrixIndexes,MatrixBlock> lrdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>) rdd.getRDD(); |
| InputOutputInfo oinfo = InputOutputInfo.get(DataType.MATRIX, fmt); |
| |
| //piggyback nnz maintenance on write |
| LongAccumulator aNnz = getSparkContextStatic().sc().longAccumulator("nnz"); |
| lrdd = lrdd.mapValues(new ComputeBinaryBlockNnzFunction(aNnz)); |
| |
| //save file is an action which also triggers nnz maintenance |
| lrdd.saveAsHadoopFile(path, |
| oinfo.keyClass, |
| oinfo.valueClass, |
| oinfo.outputFormatClass); |
| |
| //return nnz aggregate of all blocks |
| return aNnz.value(); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public static void writeFrameRDDtoHDFS( RDDObject rdd, String path, FileFormat fmt) |
| { |
| JavaPairRDD<?, FrameBlock> lrdd = (JavaPairRDD<Long, FrameBlock>) rdd.getRDD(); |
| InputOutputInfo oinfo = InputOutputInfo.get(DataType.FRAME, fmt); |
| |
| //convert keys to writables if necessary |
| if( fmt == FileFormat.BINARY ) { |
| lrdd = ((JavaPairRDD<Long, FrameBlock>)lrdd).mapToPair( |
| new LongFrameToLongWritableFrameFunction()); |
| } |
| |
| //save file is an action which also triggers nnz maintenance |
| lrdd.saveAsHadoopFile(path, |
| oinfo.keyClass, |
| oinfo.valueClass, |
| oinfo.outputFormatClass); |
| } |
| |
| /////////////////////////////////////////// |
| // Cleanup of RDDs and Broadcast variables |
| /////// |
| |
| /** |
| * Adds a child rdd object to the lineage of a parent rdd. |
| * |
| * @param varParent parent variable |
| * @param varChild child variable |
| */ |
| public void addLineageRDD(String varParent, String varChild) { |
| RDDObject parent = getCacheableData(varParent).getRDDHandle(); |
| RDDObject child = getCacheableData(varChild).getRDDHandle(); |
| parent.addLineageChild( child ); |
| } |
| |
| /** |
| * Adds a child broadcast object to the lineage of a parent rdd. |
| * |
| * @param varParent parent variable |
| * @param varChild child variable |
| */ |
| public void addLineageBroadcast(String varParent, String varChild) { |
| RDDObject parent = getCacheableData(varParent).getRDDHandle(); |
| BroadcastObject<?> child = getCacheableData(varChild).getBroadcastHandle(); |
| parent.addLineageChild( child ); |
| } |
| |
| public void addLineage(String varParent, String varChild, boolean broadcast) { |
| if( broadcast ) |
| addLineageBroadcast(varParent, varChild); |
| else |
| addLineageRDD(varParent, varChild); |
| } |
| |
| @Override |
| public void cleanupCacheableData(CacheableData<?> mo) |
| { |
| //NOTE: this method overwrites the default behavior of cleanupMatrixObject |
| //and hence is transparently used by rmvar instructions and other users. The |
| //core difference is the lineage-based cleanup of RDD and broadcast variables. |
| |
| if (DMLScript.JMLC_MEM_STATISTICS) |
| Statistics.removeCPMemObject(System.identityHashCode(mo)); |
| |
| if( !mo.isCleanupEnabled() ) |
| return; |
| |
| try |
| { |
| //compute ref count only if matrix cleanup actually necessary |
| if( !getVariables().hasReferences(mo) ) { |
| //clean cached data |
| mo.clearData(getTID()); |
| |
| //clean hdfs data if no pending rdd operations on it |
| if( mo.isHDFSFileExists() && mo.getFileName()!=null ) { |
| if( mo.getRDDHandle()==null ) { |
| HDFSTool.deleteFileWithMTDIfExistOnHDFS(mo.getFileName()); |
| } |
| else { //deferred file removal |
| RDDObject rdd = mo.getRDDHandle(); |
| rdd.setHDFSFilename(mo.getFileName()); |
| } |
| } |
| |
| //cleanup RDD and broadcast variables (recursive) |
| //note: requires that mo.clearData already removed back references |
| if( mo.getRDDHandle()!=null ) { |
| rCleanupLineageObject(mo.getRDDHandle()); |
| } |
| if( mo.getBroadcastHandle()!=null ) { |
| rCleanupLineageObject(mo.getBroadcastHandle()); |
| } |
| } |
| } |
| catch(Exception ex) { |
| throw new DMLRuntimeException(ex); |
| } |
| } |
| |
| @SuppressWarnings({ "rawtypes", "unchecked" }) |
| private void rCleanupLineageObject(LineageObject lob) |
| throws IOException |
| { |
| //abort recursive cleanup if still consumers |
| if( lob.getNumReferences() > 0 ) |
| return; |
| |
| //abort if still reachable through matrix object (via back references for |
| //robustness in function calls and to prevent repeated scans of the symbol table) |
| if( lob.hasBackReference() ) |
| return; |
| |
| //cleanup current lineage object (from driver/executors) |
| //incl deferred hdfs file removal (only if metadata set by cleanup call) |
| if( lob instanceof RDDObject ) { |
| RDDObject rdd = (RDDObject)lob; |
| int rddID = rdd.getRDD().id(); |
| cleanupRDDVariable(rdd.getRDD()); |
| if( rdd.getHDFSFilename()!=null ) { //deferred file removal |
| HDFSTool.deleteFileWithMTDIfExistOnHDFS(rdd.getHDFSFilename()); |
| } |
| if( rdd.isParallelizedRDD() ) |
| _parRDDs.deregisterRDD(rddID); |
| } |
| else if( lob instanceof BroadcastObject ) { |
| BroadcastObject bob = (BroadcastObject) lob; |
| // clean the partitioned broadcast |
| if (bob.isPartitionedBroadcastValid()) { |
| PartitionedBroadcast pbm = bob.getPartitionedBroadcast(); |
| if( pbm != null ) //robustness evictions |
| pbm.destroy(); |
| } |
| // clean the non-partitioned broadcast |
| if (((BroadcastObject) lob).isNonPartitionedBroadcastValid()) { |
| Broadcast<CacheableData> bc = bob.getNonPartitionedBroadcast(); |
| if( bc != null ) //robustness evictions |
| cleanupBroadcastVariable(bc); |
| } |
| CacheableData.addBroadcastSize(-bob.getNonPartitionedBroadcastSize()); |
| } |
| |
| //recursively process lineage children |
| for( LineageObject c : lob.getLineageChilds() ){ |
| c.decrementNumReferences(); |
| rCleanupLineageObject(c); |
| } |
| } |
| |
| /** |
| * This call destroys a broadcast variable at all executors and the driver. |
| * Hence, it is intended to be used on rmvar only. Depending on the |
| * ASYNCHRONOUS_VAR_DESTROY configuration, this is asynchronous or not. |
| * |
| * @param bvar broadcast variable |
| */ |
| public static void cleanupBroadcastVariable(Broadcast<?> bvar) |
| { |
| //In comparison to 'unpersist' (which would only delete the broadcast |
| //from the executors), this call also deletes related data from the driver. |
| if( bvar.isValid() ) { |
| bvar.destroy( !ASYNCHRONOUS_VAR_DESTROY ); |
| } |
| } |
| |
| /** |
| * This call removes an rdd variable from executor memory and disk if required. |
| * Hence, it is intended to be used on rmvar only. Depending on the |
| * ASYNCHRONOUS_VAR_DESTROY configuration, this is asynchronous or not. |
| * |
| * @param rvar rdd variable to remove |
| */ |
| public static void cleanupRDDVariable(JavaPairRDD<?,?> rvar) |
| { |
| if( rvar.getStorageLevel()!=StorageLevel.NONE() ) { |
| rvar.unpersist( !ASYNCHRONOUS_VAR_DESTROY ); |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| public void repartitionAndCacheMatrixObject( String var ) { |
| MatrixObject mo = getMatrixObject(var); |
| DataCharacteristics dcIn = mo.getDataCharacteristics(); |
| |
| //double check size to avoid unnecessary spark context creation |
| if( !OptimizerUtils.exceedsCachingThreshold(mo.getNumColumns(), |
| OptimizerUtils.estimateSizeExactSparsity(dcIn)) ) |
| return; |
| |
| //get input rdd and default storage level |
| JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>) |
| getRDDHandleForMatrixObject(mo, FileFormat.BINARY); |
| |
| //avoid unnecessary caching of input in order to reduce memory pressure |
| if( mo.getRDDHandle().allowsShortCircuitRead() |
| && isRDDMarkedForCaching(in.id()) && !isRDDCached(in.id()) ) { |
| in = (JavaPairRDD<MatrixIndexes,MatrixBlock>) |
| ((RDDObject)mo.getRDDHandle().getLineageChilds().get(0)).getRDD(); |
| |
| //investigate issue of unnecessarily large number of partitions |
| int numPartitions = SparkUtils.getNumPreferredPartitions(dcIn, in); |
| if( numPartitions < in.getNumPartitions() ) |
| in = in.coalesce( numPartitions ); |
| } |
| |
| //repartition rdd (force creation of shuffled rdd via merge), note: without deep copy albeit |
| //executed on the original data, because there will be no merge, i.e., no key duplicates |
| JavaPairRDD<MatrixIndexes,MatrixBlock> out = RDDAggregateUtils.mergeByKey(in, false); |
| |
| //convert mcsr into memory-efficient csr if potentially sparse |
| if( OptimizerUtils.checkSparseBlockCSRConversion(dcIn) ) { |
| out = out.mapValues(new CreateSparseBlockFunction(SparseBlock.Type.CSR)); |
| } |
| |
| //persist rdd in default storage level |
| out.persist( Checkpoint.DEFAULT_STORAGE_LEVEL ) |
| .count(); //trigger caching to prevent contention |
| |
| //create new rdd handle, in-place of current matrix object |
| RDDObject inro = mo.getRDDHandle(); //guaranteed to exist (see above) |
| RDDObject outro = new RDDObject(out); //create new rdd object |
| outro.setCheckpointRDD(true); //mark as checkpointed |
| outro.addLineageChild(inro); //keep lineage to prevent cycles on cleanup |
| mo.setRDDHandle(outro); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public void cacheMatrixObject( String var ) { |
| //get input rdd and default storage level |
| MatrixObject mo = getMatrixObject(var); |
| |
| //double check size to avoid unnecessary spark context creation |
| if( !OptimizerUtils.exceedsCachingThreshold(mo.getNumColumns(), |
| OptimizerUtils.estimateSizeExactSparsity(mo.getDataCharacteristics())) ) |
| return; |
| |
| JavaPairRDD<MatrixIndexes,MatrixBlock> in = (JavaPairRDD<MatrixIndexes, MatrixBlock>) |
| getRDDHandleForMatrixObject(mo, FileFormat.BINARY); |
| |
| //persist rdd (force rdd caching, if not already cached) |
| if( !isRDDCached(in.id()) ) |
| in.count(); //trigger caching to prevent contention |
| } |
| |
| public int setThreadLocalSchedulerPool() { |
| int pool = -1; |
| if( FAIR_SCHEDULER_MODE ) { |
| pool = allocSchedulerPoolName(); |
| getSparkContext().sc().setLocalProperty( |
| "spark.scheduler.pool", "parforPool"+pool); |
| } |
| return pool; |
| } |
| |
| public void cleanupThreadLocalSchedulerPool(int pool) { |
| if( FAIR_SCHEDULER_MODE ) { |
| freeSchedulerPoolName(pool); |
| getSparkContext().sc().setLocalProperty( |
| "spark.scheduler.pool", null); |
| } |
| } |
| |
| private static synchronized int allocSchedulerPoolName() { |
| int pool = ArrayUtils.indexOf(_poolBuff, false); |
| //grow pool on demand |
| if( pool < 0 ) { |
| pool = _poolBuff.length; |
| _poolBuff = Arrays.copyOf(_poolBuff, |
| (int)Math.min(2L*pool, Integer.MAX_VALUE)); |
| } |
| //mark pool name for in use |
| _poolBuff[pool] = true; |
| return pool; |
| } |
| |
| private static synchronized void freeSchedulerPoolName(int pool) { |
| _poolBuff[pool] = false; |
| } |
| |
| @SuppressWarnings("resource") |
| private boolean isRDDMarkedForCaching( int rddID ) { |
| JavaSparkContext jsc = getSparkContext(); |
| return jsc.sc().getPersistentRDDs().contains(rddID); |
| } |
| |
| @SuppressWarnings("resource") |
| public boolean isRDDCached( int rddID ) { |
| //check that rdd is marked for caching |
| JavaSparkContext jsc = getSparkContext(); |
| if( !jsc.sc().getPersistentRDDs().contains(rddID) ) { |
| return false; |
| } |
| |
| //check that rdd is actually already cached |
| for( RDDInfo info : jsc.sc().getRDDStorageInfo() ) { |
| if( info.id() == rddID ) |
| return info.isCached(); |
| } |
| return false; |
| } |
| |
| /////////////////////////////////////////// |
| // Spark configuration handling |
| /////// |
| |
| /** |
| * Obtains the lazily analyzed spark cluster configuration. |
| * |
| * @return spark cluster configuration |
| */ |
| public static SparkClusterConfig getSparkClusterConfig() { |
| //lazy creation of spark cluster config |
| if( _sconf == null ) |
| _sconf = new SparkClusterConfig(); |
| return _sconf; |
| } |
| |
| /** |
| * Obtains the available memory budget for broadcast variables in bytes. |
| * |
| * @return broadcast memory budget |
| */ |
| public static double getBroadcastMemoryBudget() { |
| return getSparkClusterConfig() |
| .getBroadcastMemoryBudget(); |
| } |
| |
| /** |
| * Obtain the available memory budget for data storage in bytes. |
| * |
| * @param min flag for minimum data budget |
| * @param refresh flag for refresh with spark context |
| * @return data memory budget |
| */ |
| public static double getDataMemoryBudget(boolean min, boolean refresh) { |
| return getSparkClusterConfig() |
| .getDataMemoryBudget(min, refresh); |
| } |
| |
| /** |
| * Obtain the number of executors in the cluster (excluding the driver). |
| * |
| * @return number of executors |
| */ |
| public static int getNumExecutors() { |
| return getSparkClusterConfig() |
| .getNumExecutors(); |
| } |
| |
| /** |
| * Obtain the default degree of parallelism (cores in the cluster). |
| * |
| * @param refresh flag for refresh with spark context |
| * @return default degree of parallelism |
| */ |
| public static int getDefaultParallelism(boolean refresh) { |
| return getSparkClusterConfig() |
| .getDefaultParallelism(refresh); |
| } |
| |
| /** |
| * Captures relevant spark cluster configuration properties, e.g., memory budgets and |
| * degree of parallelism. This configuration abstracts legacy ({@literal <} Spark 1.6) and current |
| * configurations and provides a unified view. |
| */ |
| public static class SparkClusterConfig |
| { |
| //broadcasts are stored in mem-and-disk in data space, this config |
| //defines the fraction of data space to be used as broadcast budget |
| private static final double BROADCAST_DATA_FRACTION = 0.35; |
| |
| //forward private config from Spark's UnifiedMemoryManager.scala (>1.6) |
| private static final long RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024; |
| |
| //meta configurations |
| private boolean _legacyVersion = false; //spark version <1.6 |
| private boolean _confOnly = false; //infrastructure info based on config |
| |
| //memory management configurations |
| private long _memExecutor = -1; //mem per executor |
| private double _memDataMinFrac = -1; //minimum data fraction |
| private double _memDataMaxFrac = -1; //maximum data fraction |
| private double _memBroadcastFrac = -1; //broadcast fraction |
| |
| //degree of parallelism configurations |
| private int _numExecutors = -1; //total executors |
| private int _defaultPar = -1; //total vcores |
| |
| public SparkClusterConfig() |
| { |
| SparkConf sconf = createSystemDSSparkConf(); |
| _confOnly = true; |
| |
| //parse version and config |
| String sparkVersion = getSparkVersionString(); |
| _legacyVersion = (UtilFunctions.compareVersion(sparkVersion, "1.6.0") < 0 |
| || sconf.getBoolean("spark.memory.useLegacyMode", false) ); |
| |
| //obtain basic spark configurations |
| if( _legacyVersion ) |
| analyzeSparkConfiguationLegacy(sconf); |
| else |
| analyzeSparkConfiguation(sconf); |
| |
| //log debug of created spark cluster config |
| if( LOG.isDebugEnabled() ) |
| LOG.debug( this.toString() ); |
| } |
| |
| public long getBroadcastMemoryBudget() { |
| return (long) (_memExecutor * _memBroadcastFrac); |
| } |
| |
| public long getDataMemoryBudget(boolean min, boolean refresh) { |
| //always get the current num executors on refresh because this might |
| //change if not all executors are initially allocated and it is plan-relevant |
| int numExec = _numExecutors; |
| if( (refresh && !_confOnly) || isSparkContextCreated() ) { |
| numExec = Math.max(getSparkContextStatic().sc() |
| .getExecutorMemoryStatus().size() - 1, 1); |
| } |
| |
| //compute data memory budget |
| return (long) ( numExec * _memExecutor * |
| (min ? _memDataMinFrac : _memDataMaxFrac) ); |
| } |
| |
| public int getNumExecutors() { |
| if( _numExecutors < 0 ) |
| analyzeSparkParallelismConfiguation(null); |
| return _numExecutors; |
| } |
| |
| public int getDefaultParallelism(boolean refresh) { |
| if( _defaultPar < 0 && !refresh ) |
| analyzeSparkParallelismConfiguation(null); |
| |
| //always get the current default parallelism on refresh because this might |
| //change if not all executors are initially allocated and it is plan-relevant |
| int par = ( (refresh && !_confOnly) || isSparkContextCreated() ) ? |
| getSparkContextStatic().defaultParallelism() : _defaultPar; |
| return Math.max(par, 1); //robustness min parallelism |
| } |
| |
| public void analyzeSparkConfiguationLegacy(SparkConf conf) { |
| //ensure allocated spark conf |
| SparkConf sconf = (conf == null) ? createSystemDSSparkConf() : conf; |
| |
| //parse absolute executor memory |
| _memExecutor = UtilFunctions.parseMemorySize( |
| sconf.get("spark.executor.memory", "1g")); |
| |
| //get data and shuffle memory ratios (defaults not specified in job conf) |
| double dataFrac = sconf.getDouble("spark.storage.memoryFraction", 0.6); //default 60% |
| _memDataMinFrac = dataFrac; |
| _memDataMaxFrac = dataFrac; |
| _memBroadcastFrac = dataFrac * BROADCAST_DATA_FRACTION; //default 18% |
| |
| //analyze spark degree of parallelism |
| analyzeSparkParallelismConfiguation(sconf); |
| } |
| |
| public void analyzeSparkConfiguation(SparkConf conf) { |
| //ensure allocated spark conf |
| SparkConf sconf = (conf == null) ? createSystemDSSparkConf() : conf; |
| |
| //parse absolute executor memory, incl fixed cut off |
| _memExecutor = UtilFunctions.parseMemorySize( |
| sconf.get("spark.executor.memory", "1g")) |
| - RESERVED_SYSTEM_MEMORY_BYTES; |
| |
| //get data and shuffle memory ratios (defaults not specified in job conf) |
| _memDataMinFrac = sconf.getDouble("spark.memory.storageFraction", 0.5); //default 50% |
| _memDataMaxFrac = sconf.getDouble("spark.memory.fraction", 0.6); //default 60% |
| _memBroadcastFrac = _memDataMaxFrac * BROADCAST_DATA_FRACTION; //default 21% |
| |
| //analyze spark degree of parallelism |
| analyzeSparkParallelismConfiguation(sconf); |
| } |
| |
| private void analyzeSparkParallelismConfiguation(SparkConf conf) { |
| //ensure allocated spark conf |
| SparkConf sconf = (conf == null) ? createSystemDSSparkConf() : conf; |
| |
| int numExecutors = sconf.getInt("spark.executor.instances", -1); |
| int numCoresPerExec = sconf.getInt("spark.executor.cores", -1); |
| int defaultPar = sconf.getInt("spark.default.parallelism", -1); |
| |
| if( numExecutors > 1 && (defaultPar > 1 || numCoresPerExec > 1) ) { |
| _numExecutors = numExecutors; |
| _defaultPar = (defaultPar>1) ? defaultPar : numExecutors * numCoresPerExec; |
| _confOnly &= true; |
| } |
| else if( DMLScript.USE_LOCAL_SPARK_CONFIG ) { |
| //avoid unnecessary spark context creation in local mode (e.g., tests) |
| _numExecutors = 1; |
| _defaultPar = 2; |
| _confOnly &= true; |
| } |
| else { |
| //get default parallelism (total number of executors and cores) |
| //note: spark context provides this information while conf does not |
| //(for num executors we need to correct for driver and local mode) |
| @SuppressWarnings("resource") |
| JavaSparkContext jsc = getSparkContextStatic(); |
| _numExecutors = Math.max(jsc.sc().getExecutorMemoryStatus().size() - 1, 1); |
| _defaultPar = jsc.defaultParallelism(); |
| _confOnly &= false; //implies env info refresh w/ spark context |
| } |
| } |
| |
| /** |
| * Obtains the spark version string. If the spark context has been created, |
| * we simply get it from the context; otherwise, we use Spark internal |
| * constants to avoid creating the spark context just for the version. |
| * |
| * @return spark version string |
| */ |
| private static String getSparkVersionString() { |
| //check for existing spark context |
| if( isSparkContextCreated() ) |
| return getSparkContextStatic().version(); |
| |
| //use spark internal constant to avoid context creation |
| return org.apache.spark.package$.MODULE$.SPARK_VERSION(); |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder("SparkClusterConfig: \n"); |
| sb.append("-- legacyVersion = " + _legacyVersion + " ("+getSparkVersionString()+")\n" ); |
| sb.append("-- confOnly = " + _confOnly + "\n"); |
| sb.append("-- numExecutors = " + _numExecutors + "\n"); |
| sb.append("-- defaultPar = " + _defaultPar + "\n"); |
| sb.append("-- memExecutor = " + _memExecutor + "\n"); |
| sb.append("-- memDataMinFrac = " + _memDataMinFrac + "\n"); |
| sb.append("-- memDataMaxFrac = " + _memDataMaxFrac + "\n"); |
| sb.append("-- memBroadcastFrac = " + _memBroadcastFrac + "\n"); |
| return sb.toString(); |
| } |
| } |
| |
| private static class MemoryManagerParRDDs |
| { |
| private final long _limit; |
| private long _size; |
| private HashMap<Integer, Long> _rdds; |
| |
| public MemoryManagerParRDDs(double fractionMem) { |
| _limit = (long)(fractionMem * InfrastructureAnalyzer.getLocalMaxMemory()); |
| _size = 0; |
| _rdds = new HashMap<>(); |
| } |
| |
| public synchronized boolean reserve(long rddSize) { |
| boolean ret = (rddSize + _size < _limit); |
| _size += ret ? rddSize : 0; |
| return ret; |
| } |
| |
| public synchronized void registerRDD(int rddID, long rddSize, boolean reserved) { |
| if( !reserved ) { |
| throw new RuntimeException("Unsupported rdd registration " |
| + "without size reservation for "+rddSize+" bytes."); |
| } |
| _rdds.put(rddID, rddSize); |
| } |
| |
| public synchronized void deregisterRDD(int rddID) { |
| Long rddSize = _rdds.remove(rddID); |
| _size -= (rddSize!=null) ? rddSize : 0; |
| } |
| |
| public synchronized void clear() { |
| _size = 0; |
| _rdds.clear(); |
| } |
| } |
| } |