blob: de6326178259c4d012a22e907ead285fab622903 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.controlprogram.parfor;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.ParForProgramBlock.PartitionFormat;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.context.SparkExecutionContext;
import org.apache.sysds.runtime.instructions.spark.utils.SparkUtils;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.util.HDFSTool;
import org.apache.sysds.utils.Statistics;
/**
* MR job class for submitting parfor remote partitioning MR jobs.
*
*/
public class DataPartitionerRemoteSpark extends DataPartitioner
{
private final ExecutionContext _ec;
private final long _numRed;
private final int _replication;
public DataPartitionerRemoteSpark(PartitionFormat dpf, ExecutionContext ec, long numRed, int replication, boolean keepIndexes)
{
super(dpf._dpf, dpf._N);
_ec = ec;
_numRed = numRed;
_replication = replication;
}
@Override
@SuppressWarnings("unchecked")
protected void partitionMatrix(MatrixObject in, String fnameNew, FileFormat fmt, long rlen, long clen, int blen)
{
String jobname = "ParFor-DPSP";
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
SparkExecutionContext sec = (SparkExecutionContext)_ec;
try
{
//cleanup existing output files
HDFSTool.deleteFileIfExistOnHDFS(fnameNew);
//get input rdd
JavaPairRDD<MatrixIndexes, MatrixBlock> inRdd = (JavaPairRDD<MatrixIndexes, MatrixBlock>)
sec.getRDDHandleForMatrixObject(in, FileFormat.BINARY);
//determine degree of parallelism
DataCharacteristics mc = in.getDataCharacteristics();
int numRed = (int)determineNumReducers(inRdd, mc, _numRed);
//run spark remote data partition job
DataPartitionerRemoteSparkMapper dpfun = new DataPartitionerRemoteSparkMapper(mc, fmt, _format, _n);
DataPartitionerRemoteSparkReducer wfun = new DataPartitionerRemoteSparkReducer(fnameNew, fmt, _replication);
inRdd.flatMapToPair(dpfun) //partition the input blocks
.groupByKey(numRed) //group partition blocks
.foreach(wfun); //write partitions to hdfs
}
catch(Exception ex) {
throw new DMLRuntimeException(ex);
}
//maintain statistics
Statistics.incrementNoOfCompiledSPInst();
Statistics.incrementNoOfExecutedSPInst();
if( DMLScript.STATISTICS ){
Statistics.maintainCPHeavyHitters(jobname, System.nanoTime()-t0);
}
}
private long determineNumReducers(JavaPairRDD<MatrixIndexes,MatrixBlock> in, DataCharacteristics mc, long numRed)
{
long rlen = mc.getRows();
long clen = mc.getCols();
int blen = mc.getBlocksize();
//determine number of reducer groups
long reducerGroups = -1;
switch( _format ) {
case ROW_WISE: reducerGroups = rlen; break;
case COLUMN_WISE: reducerGroups = clen; break;
case ROW_BLOCK_WISE: reducerGroups = (rlen/blen)+((rlen%blen==0)?0:1); break;
case COLUMN_BLOCK_WISE: reducerGroups = (clen/blen)+((clen%blen==0)?0:1); break;
case ROW_BLOCK_WISE_N: reducerGroups = (rlen/_n)+((rlen%_n==0)?0:1); break;
case COLUMN_BLOCK_WISE_N: reducerGroups = (clen/_n)+((clen%_n==0)?0:1); break;
default:
//do nothing
}
//compute number of reducers (to avoid OOMs and reduce memory pressure)
int numParts = SparkUtils.getNumPreferredPartitions(mc, in);
return Math.max(numRed, Math.min(numParts, reducerGroups));
}
}