blob: 54e5ed323fb7c84e966e4ddc9dab5977bb852371 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.controlprogram.parfor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.Writable;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.ml.linalg.SparseVector;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.util.LongAccumulator;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PDataPartitionFormat;
import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.controlprogram.parfor.util.PairWritableBlock;
import org.apache.sysds.runtime.instructions.spark.data.DatasetObject;
import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtils;
import org.apache.sysds.runtime.instructions.spark.utils.RDDConverterUtils.DataFrameExtractIDFunction;
import org.apache.sysds.runtime.instructions.spark.utils.SparkUtils;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.util.UtilFunctions;
import org.apache.sysds.utils.Statistics;
import scala.Tuple2;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
/**
* TODO heavy hitter maintenance
* TODO data partitioning with binarycell
*
*/
public class RemoteDPParForSpark
{
protected static final Log LOG = LogFactory.getLog(RemoteDPParForSpark.class.getName());
public static RemoteParForJobReturn runJob(long pfid, String itervar, String matrixvar, String program, HashMap<String, byte[]> clsMap,
String resultFile, MatrixObject input, ExecutionContext ec, PartitionFormat dpf, FileFormat fmt,
boolean tSparseCol, boolean enableCPCaching, int numReducers )
{
String jobname = "ParFor-DPESP";
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
SparkExecutionContext sec = (SparkExecutionContext)ec;
JavaSparkContext sc = sec.getSparkContext();
//prepare input parameters
MatrixObject mo = sec.getMatrixObject(matrixvar);
DataCharacteristics mc = mo.getDataCharacteristics();
//initialize accumulators for tasks/iterations, and inputs
JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryMatrixBlockRDDHandleForVariable(matrixvar);
LongAccumulator aTasks = sc.sc().longAccumulator("tasks");
LongAccumulator aIters = sc.sc().longAccumulator("iterations");
//compute number of reducers (to avoid OOMs and reduce memory pressure)
int numParts = SparkUtils.getNumPreferredPartitions(mc, in);
int numReducers2 = Math.max(numReducers, Math.min(numParts, (int)dpf.getNumParts(mc)));
//core parfor datapartition-execute (w/ or w/o shuffle, depending on data characteristics)
RemoteDPParForSparkWorker efun = new RemoteDPParForSparkWorker(program, clsMap,
matrixvar, itervar, enableCPCaching, mc, tSparseCol, dpf, fmt, aTasks, aIters);
JavaPairRDD<Long,Writable> tmp = getPartitionedInput(sec, matrixvar, fmt, dpf);
List<Tuple2<Long,String>> out = (requiresGrouping(dpf, mo) ?
tmp.groupByKey(numReducers2) : tmp.map(new PseudoGrouping()) )
.mapPartitionsToPair(efun) //execute parfor tasks, incl cleanup
.collect(); //get output handles
//de-serialize results
LocalVariableMap[] results = RemoteParForUtils.getResults(out, LOG);
int numTasks = aTasks.value().intValue(); //get accumulator value
int numIters = aIters.value().intValue(); //get accumulator value
//create output symbol table entries
RemoteParForJobReturn ret = new RemoteParForJobReturn(true, numTasks, numIters, results);
//maintain statistics
Statistics.incrementNoOfCompiledSPInst();
Statistics.incrementNoOfExecutedSPInst();
if( DMLScript.STATISTICS ){
Statistics.maintainCPHeavyHitters(jobname, System.nanoTime()-t0);
}
return ret;
}
@SuppressWarnings("unchecked")
private static JavaPairRDD<Long, Writable> getPartitionedInput(SparkExecutionContext sec,
String matrixvar, FileFormat fmt, PartitionFormat dpf)
{
MatrixObject mo = sec.getMatrixObject(matrixvar);
DataCharacteristics mc = mo.getDataCharacteristics();
//leverage existing dataset (w/o shuffling for reblock and data partitioning)
//NOTE: there will always be a checkpoint rdd on top of the input rdd and the dataset
if( hasInputDataSet(dpf, mo) )
{
DatasetObject dsObj = (DatasetObject)mo.getRDDHandle()
.getLineageChilds().get(0).getLineageChilds().get(0);
Dataset<Row> in = dsObj.getDataset();
//construct or reuse row ids
JavaPairRDD<Row, Long> prepinput = dsObj.containsID() ?
in.javaRDD().mapToPair(new DataFrameExtractIDFunction(
in.schema().fieldIndex(RDDConverterUtils.DF_ID_COLUMN))) :
in.javaRDD().zipWithIndex(); //zip row index
//convert row to row in matrix block format
return prepinput.mapToPair(new DataFrameToRowBinaryBlockFunction(
mc.getCols(), dsObj.isVectorBased(), dsObj.containsID()));
}
//binary block input rdd without grouping
else if( !requiresGrouping(dpf, mo) )
{
//get input rdd and data partitioning
JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryMatrixBlockRDDHandleForVariable(matrixvar);
DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, fmt, dpf._dpf, dpf._N);
return in.flatMapToPair(dpfun);
}
//default binary block input rdd with grouping
else
{
//get input rdd, avoid unnecessary caching if input is checkpoint and not cached yet
//to reduce memory pressure for shuffle and subsequent
JavaPairRDD<MatrixIndexes,MatrixBlock> in = sec.getBinaryMatrixBlockRDDHandleForVariable(matrixvar);
if( mo.getRDDHandle().isCheckpointRDD() && !sec.isRDDCached(in.id()) )
in = (JavaPairRDD<MatrixIndexes,MatrixBlock>)((RDDObject)
mo.getRDDHandle().getLineageChilds().get(0)).getRDD();
//data partitioning of input rdd
DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, fmt, dpf._dpf, dpf._N);
return in.flatMapToPair(dpfun);
}
}
//determines if given input matrix requires grouping of partial partition slices
private static boolean requiresGrouping(PartitionFormat dpf, MatrixObject mo) {
DataCharacteristics mc = mo.getDataCharacteristics();
return ((dpf == PartitionFormat.ROW_WISE && mc.getNumColBlocks() > 1)
|| (dpf == PartitionFormat.COLUMN_WISE && mc.getNumRowBlocks() > 1)
|| (dpf._dpf == PDataPartitionFormat.ROW_BLOCK_WISE_N && mc.getNumColBlocks() > 1)
|| (dpf._dpf == PDataPartitionFormat.COLUMN_BLOCK_WISE_N && mc.getNumRowBlocks() > 1))
&& !hasInputDataSet(dpf, mo);
}
//determines if given input matrix wraps input data set applicable to direct processing
private static boolean hasInputDataSet(PartitionFormat dpf, MatrixObject mo) {
return (dpf == PartitionFormat.ROW_WISE
&& mo.getRDDHandle().isCheckpointRDD()
&& mo.getRDDHandle().getLineageChilds().size()==1
&& mo.getRDDHandle().getLineageChilds().get(0).getLineageChilds().size()==1
&& mo.getRDDHandle().getLineageChilds().get(0).getLineageChilds().get(0) instanceof DatasetObject);
}
//function to map data partition output to parfor input signature without grouping
private static class PseudoGrouping implements Function<Tuple2<Long, Writable>, Tuple2<Long, Iterable<Writable>>> {
private static final long serialVersionUID = 2016614593596923995L;
@Override
public Tuple2<Long, Iterable<Writable>> call(Tuple2<Long, Writable> arg0) {
return new Tuple2<>(arg0._1(), Collections.singletonList(arg0._2()));
}
}
//function to map dataset rows to rows in binary block representation
private static class DataFrameToRowBinaryBlockFunction implements PairFunction<Tuple2<Row,Long>,Long,Writable>
{
private static final long serialVersionUID = -3162404379379461523L;
private final long _clen;
private final boolean _containsID;
private final boolean _isVector;
public DataFrameToRowBinaryBlockFunction(long clen, boolean containsID, boolean isVector) {
_clen = clen;
_containsID = containsID;
_isVector = isVector;
}
@Override
public Tuple2<Long, Writable> call(Tuple2<Row, Long> arg0)
throws Exception
{
long rowix = arg0._2() + 1;
//process row data
int off = _containsID ? 1: 0;
Object obj = _isVector ? arg0._1().get(off) : arg0._1();
boolean sparse = (obj instanceof SparseVector);
MatrixBlock mb = new MatrixBlock(1, (int)_clen, sparse);
if( _isVector ) {
Vector vect = (Vector) obj;
if( vect instanceof SparseVector ) {
SparseVector svect = (SparseVector) vect;
int lnnz = svect.numNonzeros();
for( int k=0; k<lnnz; k++ )
mb.appendValue(0, svect.indices()[k], svect.values()[k]);
}
else { //dense
for( int j=0; j<_clen; j++ )
mb.appendValue(0, j, vect.apply(j));
}
}
else { //row
Row row = (Row) obj;
for( int j=off; j<off+_clen; j++ )
mb.appendValue(0, j-off, UtilFunctions.getDouble(row.get(j)));
}
mb.examSparsity();
return new Tuple2<>(rowix, new PairWritableBlock(new MatrixIndexes(1,1),mb));
}
}
}