[SYSTEMML-2196] Multi-threaded matrix blocking on RDD parallelize
This patch improves the performance for CP to Spark data exchange by
parallelizing the matrix blocking on RDD parallelize. For a scenario of
10 x 800MB matrix parallelization (incl blocking),this patch improved
performance from 8.1s to 3.8s.
diff --git a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
index 179c24a..fb268d4 100644
--- a/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
+++ b/src/main/java/org/apache/sysml/runtime/controlprogram/context/SparkExecutionContext.java
@@ -20,9 +20,12 @@
package org.apache.sysml.runtime.controlprogram.context;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -684,37 +687,17 @@
throws DMLRuntimeException
{
long t0 = DMLScript.STATISTICS ? System.nanoTime() : 0;
- LinkedList<Tuple2<MatrixIndexes,MatrixBlock>> list = new LinkedList<>();
+ List<Tuple2<MatrixIndexes,MatrixBlock>> list = null;
- if( src.getNumRows() <= brlen
- && src.getNumColumns() <= bclen )
- {
- list.addLast(new Tuple2<>(new MatrixIndexes(1,1), src));
+ if( src.getNumRows() <= brlen && src.getNumColumns() <= bclen ) {
+ list = Arrays.asList(new Tuple2<>(new MatrixIndexes(1,1), src));
}
- else
- {
- boolean sparse = src.isInSparseFormat();
-
- //create and write subblocks of matrix
- for(int blockRow = 0; blockRow < (int)Math.ceil(src.getNumRows()/(double)brlen); blockRow++)
- for(int blockCol = 0; blockCol < (int)Math.ceil(src.getNumColumns()/(double)bclen); blockCol++)
- {
- int maxRow = (blockRow*brlen + brlen < src.getNumRows()) ? brlen : src.getNumRows() - blockRow*brlen;
- int maxCol = (blockCol*bclen + bclen < src.getNumColumns()) ? bclen : src.getNumColumns() - blockCol*bclen;
-
- MatrixBlock block = new MatrixBlock(maxRow, maxCol, sparse);
-
- int row_offset = blockRow*brlen;
- int col_offset = blockCol*bclen;
-
- //copy submatrix to block
- src.slice( row_offset, row_offset+maxRow-1,
- col_offset, col_offset+maxCol-1, block );
-
- //append block to sequence file
- MatrixIndexes indexes = new MatrixIndexes(blockRow+1, blockCol+1);
- list.addLast(new Tuple2<>(indexes, block));
- }
+ else {
+ MatrixCharacteristics mc = new MatrixCharacteristics(
+ src.getNumRows(), src.getNumColumns(), brlen, bclen, src.getNonZeros());
+ list = LongStream.range(0, mc.getNumBlocks()).parallel()
+ .mapToObj(i -> createIndexedBlock(src, mc, i))
+ .collect(Collectors.toList());
}
JavaPairRDD<MatrixIndexes,MatrixBlock> result = sc.parallelizePairs(list);
@@ -725,6 +708,28 @@
return result;
}
+
+ private static Tuple2<MatrixIndexes,MatrixBlock> createIndexedBlock(MatrixBlock mb, MatrixCharacteristics mc, long ix) {
+ try {
+ //compute block indexes
+ long blockRow = ix / mc.getNumColBlocks();
+ long blockCol = ix % mc.getNumColBlocks();
+ //compute block sizes
+ int maxRow = UtilFunctions.computeBlockSize(mc.getRows(), blockRow+1, mc.getRowsPerBlock());
+ int maxCol = UtilFunctions.computeBlockSize(mc.getCols(), blockCol+1, mc.getColsPerBlock());
+ //copy sub-matrix to block
+ MatrixBlock block = new MatrixBlock(maxRow, maxCol, mb.isInSparseFormat());
+ int row_offset = (int)blockRow*mc.getRowsPerBlock();
+ int col_offset = (int)blockCol*mc.getColsPerBlock();
+ block = mb.slice( row_offset, row_offset+maxRow-1,
+ col_offset, col_offset+maxCol-1, block );
+ //create key-value pair
+ return new Tuple2<>(new MatrixIndexes(blockRow+1, blockCol+1), block);
+ }
+ catch(DMLRuntimeException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
public static JavaPairRDD<Long,FrameBlock> toFrameJavaPairRDD(JavaSparkContext sc, FrameBlock src)
throws DMLRuntimeException