blob: 891dfbe79ad414fa8f3b0e2b5726a71dd21ebd02 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.matrix.data;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.data.DenseBlockFactory;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.data.SparseBlockCSR;
import org.apache.sysds.runtime.functionobjects.DiagIndex;
import org.apache.sysds.runtime.functionobjects.RevIndex;
import org.apache.sysds.runtime.functionobjects.SortIndex;
import org.apache.sysds.runtime.functionobjects.SwapIndex;
import org.apache.sysds.runtime.instructions.spark.data.IndexedMatrixValue;
import org.apache.sysds.runtime.matrix.operators.ReorgOperator;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.util.CommonThreadPool;
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.runtime.util.SortUtils;
import org.apache.sysds.runtime.util.UtilFunctions;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
/**
* MB:
* Library for selected matrix reorg operations including special cases
* and all combinations of dense and sparse representations.
*
* Current list of supported operations:
* - reshape,
* - r' (transpose),
* - rdiag (diagV2M/diagM2V),
* - rsort (sorting data/indexes)
* - rmempty (remove empty)
* - rexpand (outer/table-seq expansion)
*/
public class LibMatrixReorg
{
//minimum number of elements for multi-threaded execution
public static final long PAR_NUMCELL_THRESHOLD = 1024*1024; //1M
// SORTING threshold
public static final int PAR_NUMCELL_THRESHOLD_SORT = 1024;
//allow shallow dense/sparse copy for unchanged data (which is
//safe due to copy-on-write and safe update-in-place handling)
public static final boolean SHALLOW_COPY_REORG = true;
//use csr instead of mcsr sparse block for rexpand columns / diag v2m
public static final boolean SPARSE_OUTPUTS_IN_CSR = true;
private enum ReorgType {
TRANSPOSE,
REV,
DIAG,
RESHAPE,
SORT,
INVALID,
}
private LibMatrixReorg() {
//prevent instantiation via private constructor
}
/////////////////////////
// public interface //
/////////////////////////
public static boolean isSupportedReorgOperator( ReorgOperator op )
{
return (getReorgType(op) != ReorgType.INVALID);
}
public static MatrixBlock reorg( MatrixBlock in, MatrixBlock out, ReorgOperator op ) {
ReorgType type = getReorgType(op);
switch( type ) {
case TRANSPOSE:
if( op.getNumThreads() > 1 )
return transpose(in, out, op.getNumThreads());
else
return transpose(in, out);
case REV:
return rev(in, out);
case DIAG:
return diag(in, out);
case SORT:
SortIndex ix = (SortIndex) op.fn;
if (op.getNumThreads() > 1)
return sort(in, out, ix.getCols(), ix.getDecreasing(), ix.getIndexReturn(), op.getNumThreads());
else
return sort(in, out, ix.getCols(), ix.getDecreasing(), ix.getIndexReturn());
default:
throw new DMLRuntimeException("Unsupported reorg operator: "+op.fn);
}
}
public static MatrixBlock transpose( MatrixBlock in, MatrixBlock out ) {
//sparse-safe operation
if( in.isEmptyBlock(false) )
return out;
//set basic meta data
out.nonZeros = in.nonZeros;
//shallow dense vector transpose (w/o result allocation)
//since the physical representation of dense vectors is always the same,
//we don't need to create a copy, given our copy on write semantics.
//however, note that with update in-place this would be an invalid optimization
if( SHALLOW_COPY_REORG && !in.sparse && !out.sparse && (in.rlen==1 || in.clen==1) ) {
out.denseBlock = DenseBlockFactory.createDenseBlock(in.getDenseBlockValues(), in.clen, in.rlen);
return out;
}
//Timing time = new Timing(true);
//allocate output arrays (if required)
if( out.sparse )
out.allocateSparseRowsBlock(false);
else
out.allocateDenseBlock(false);
//execute transpose operation
if( !in.sparse && !out.sparse )
transposeDenseToDense( in, out, 0, in.rlen, 0, in.clen );
else if( in.sparse && out.sparse )
transposeSparseToSparse( in, out, 0, in.rlen, 0, in.clen,
countNnzPerColumn(in, 0, in.rlen));
else if( in.sparse )
transposeSparseToDense( in, out, 0, in.rlen, 0, in.clen );
else
transposeDenseToSparse( in, out );
//System.out.println("r' ("+in.rlen+", "+in.clen+", "+in.sparse+", "+out.sparse+") in "+time.stop()+" ms.");
return out;
}
public static MatrixBlock transpose( MatrixBlock in, MatrixBlock out, int k ) {
//redirect small or special cases to sequential execution
if( in.isEmptyBlock(false) || (in.rlen * in.clen < PAR_NUMCELL_THRESHOLD) || k == 1
|| (SHALLOW_COPY_REORG && !in.sparse && !out.sparse && (in.rlen==1 || in.clen==1) )
|| (in.sparse && !out.sparse && in.rlen==1) || (!in.sparse && out.sparse && in.rlen==1)
|| (!in.sparse && out.sparse) || !out.isThreadSafe())
{
return transpose(in, out);
}
//Timing time = new Timing(true);
//set meta data and allocate output arrays (if required)
out.nonZeros = in.nonZeros;
if( out.sparse )
out.allocateSparseRowsBlock(false);
else
out.allocateDenseBlock(false);
//core multi-threaded transpose
try {
ExecutorService pool = CommonThreadPool.get(k);
//pre-processing (compute nnz per column once for sparse)
int[] cnt = null;
if( in.sparse && out.sparse ) {
ArrayList<CountNnzTask> tasks = new ArrayList<>();
int blklen = (int)(Math.ceil((double)in.rlen/k));
for( int i=0; i<k & i*blklen<in.rlen; i++ )
tasks.add(new CountNnzTask(in, i*blklen, Math.min((i+1)*blklen, in.rlen)));
List<Future<int[]>> rtasks = pool.invokeAll(tasks);
for( Future<int[]> rtask : rtasks )
cnt = mergeNnzCounts(cnt, rtask.get());
}
//compute actual transpose and check for errors
ArrayList<TransposeTask> tasks = new ArrayList<>();
boolean row = (in.sparse || in.rlen >= in.clen) && !out.sparse;
int len = row ? in.rlen : in.clen;
int blklen = (int)(Math.ceil((double)len/k));
blklen += (blklen%8 != 0)?8-blklen%8:0;
for( int i=0; i<k & i*blklen<len; i++ )
tasks.add(new TransposeTask(in, out, row, i*blklen, Math.min((i+1)*blklen, len), cnt));
List<Future<Object>> taskret = pool.invokeAll(tasks);
pool.shutdown();
for( Future<Object> task : taskret )
task.get();
}
catch(Exception ex) {
throw new DMLRuntimeException(ex);
}
//System.out.println("r' k="+k+" ("+in.rlen+", "+in.clen+", "+in.sparse+", "+out.sparse+") in "+time.stop()+" ms.");
return out;
}
public static MatrixBlock rev( MatrixBlock in, MatrixBlock out ) {
//Timing time = new Timing(true);
//sparse-safe operation
if( in.isEmptyBlock(false) )
return out;
//special case: row vector
if( in.rlen == 1 ) {
out.copy(in);
return out;
}
if( in.sparse )
reverseSparse( in, out );
else
reverseDense( in, out );
//System.out.println("rev ("+in.rlen+", "+in.clen+", "+in.sparse+") in "+time.stop()+" ms.");
return out;
}
public static void rev( IndexedMatrixValue in, long rlen, int blen, ArrayList<IndexedMatrixValue> out ) {
//input block reverse
MatrixIndexes inix = in.getIndexes();
MatrixBlock inblk = (MatrixBlock) in.getValue();
MatrixBlock tmpblk = rev(inblk, new MatrixBlock(inblk.getNumRows(), inblk.getNumColumns(), inblk.isInSparseFormat()));
//split and expand block if necessary (at most 2 blocks)
if( rlen % blen == 0 ) //special case: aligned blocks
{
int nrblks = (int)Math.ceil((double)rlen/blen);
out.add(new IndexedMatrixValue(
new MatrixIndexes(nrblks-inix.getRowIndex()+1, inix.getColumnIndex()), tmpblk));
}
else //general case: unaligned blocks
{
//compute target positions and sizes
long pos1 = rlen - UtilFunctions.computeCellIndex(inix.getRowIndex(), blen, tmpblk.getNumRows()-1) + 1;
long pos2 = pos1 + tmpblk.getNumRows() - 1;
int ipos1 = UtilFunctions.computeCellInBlock(pos1, blen);
int iposCut = tmpblk.getNumRows() - ipos1 - 1;
int blkix1 = (int)UtilFunctions.computeBlockIndex(pos1, blen);
int blkix2 = (int)UtilFunctions.computeBlockIndex(pos2, blen);
int blklen1 = UtilFunctions.computeBlockSize(rlen, blkix1, blen);
int blklen2 = UtilFunctions.computeBlockSize(rlen, blkix2, blen);
//slice first block
MatrixIndexes outix1 = new MatrixIndexes(blkix1, inix.getColumnIndex());
MatrixBlock outblk1 = new MatrixBlock(blklen1, inblk.getNumColumns(), inblk.isInSparseFormat());
MatrixBlock tmp1 = tmpblk.slice(0, iposCut);
outblk1.leftIndexingOperations(tmp1, ipos1, ipos1+tmp1.getNumRows()-1,
0, tmpblk.getNumColumns()-1, outblk1, UpdateType.INPLACE_PINNED);
out.add(new IndexedMatrixValue(outix1, outblk1));
//slice second block (if necessary)
if( blkix1 != blkix2 ) {
MatrixIndexes outix2 = new MatrixIndexes(blkix2, inix.getColumnIndex());
MatrixBlock outblk2 = new MatrixBlock(blklen2, inblk.getNumColumns(), inblk.isInSparseFormat());
MatrixBlock tmp2 = tmpblk.slice(iposCut+1, tmpblk.getNumRows()-1);
outblk2.leftIndexingOperations(tmp2, 0, tmp2.getNumRows()-1, 0, tmpblk.getNumColumns()-1, outblk2, UpdateType.INPLACE_PINNED);
out.add(new IndexedMatrixValue(outix2, outblk2));
}
}
}
public static MatrixBlock diag( MatrixBlock in, MatrixBlock out ) {
//Timing time = new Timing(true);
//sparse-safe operation
if( in.isEmptyBlock(false) )
return out;
int rlen = in.rlen;
int clen = in.clen;
if( clen == 1 ) //diagV2M
diagV2M( in, out );
else if ( rlen == clen ) //diagM2V
diagM2V( in, out );
else
throw new DMLRuntimeException("Reorg diagM2V requires squared block input. ("+rlen+", "+clen+")");
//System.out.println("rdiag ("+in.rlen+", "+in.clen+", "+in.sparse+", "+out.sparse+") in "+time.stop()+" ms.");
return out;
}
public static MatrixBlock sort(MatrixBlock in, MatrixBlock out, int[] by, boolean desc, boolean ixret) {
return sort(in,out,by,desc,ixret,1);
}
/**
* @param in Input matrix to sort
* @param out Output matrix where the sorted input is inserted to
* @param by The Ordering parameter
* @param desc A boolean, specifying if it should be descending order.
* @param ixret A boolean, specifying if the return should be the sorted indexes.
* @param k Number of parallel threads
* @return The sorted out matrix.
*/
public static MatrixBlock sort(MatrixBlock in, MatrixBlock out, int[] by, boolean desc, boolean ixret, int k) {
//Timing time = new Timing(true);
//meta data gathering and preparation
boolean sparse = in.isInSparseFormat();
int rlen = in.rlen;
int clen = in.clen;
out.sparse = (in.sparse && !ixret);
out.nonZeros = ixret ? rlen : in.nonZeros;
//step 1: error handling
if( !isValidSortByList(by, clen) )
throw new DMLRuntimeException("Sort configuration issue: invalid orderby columns: "
+ Arrays.toString(by)+" ("+rlen+"x"+clen+" input).");
//step 2: empty block / special case handling
if( !ixret ) //SORT DATA
{
if( in.isEmptyBlock(false) ) //EMPTY INPUT BLOCK
return out;
if( !sparse && clen == 1 ) { //DENSE COLUMN VECTOR
//in-place quicksort, unstable (no indexes needed)
out.copy( in ); //dense (always single block)
if (k > 1)
Arrays.parallelSort(out.getDenseBlockValues());
else
Arrays.sort(out.getDenseBlockValues());
if( desc )
sortReverseDense(out);
return out;
}
}
else //SORT INDEX
{
if( in.isEmptyBlock(false) ) { //EMPTY INPUT BLOCK
out.allocateDenseBlock(false); //single block
double[] c = out.getDenseBlockValues();
// create a list containing the indexes of each element in in.
for( int i=0; i<rlen; i++ )
c[i] = i+1; //seq(1,n)
return out;
}
}
//step 3: index vector sorting
//create index vector and extract values
//TODO perf: reconsider partition sort to avoid unnecessary barriers
int[] vix = new int[rlen];
double[] values = new double[rlen];
for( int i=0; i<rlen; i++ ) {
vix[i] = i;
values[i] = in.quickGetValue(i, by[0]-1);
}
// step 4: split the data into number of blocks of PAR_NUMCELL_THRESHOLD_SORT (1024) elements.
if (k == 1 || rlen < PAR_NUMCELL_THRESHOLD_SORT){ // There is no parallel
//sort index vector on extracted data (unstable)
SortUtils.sortByValue(0, rlen, values, vix);
} else {
try {
ExecutorService pool = CommonThreadPool.get(k);
ArrayList<SortTask> tasks = new ArrayList<>();
// sort smaller blocks.
int blklen = (int)(Math.ceil((double)rlen/k));
for( int i=0; i*blklen<rlen; i++ ){
int start = i*blklen;
int stop = Math.min(rlen , i*blklen + blklen);
tasks.add(new SortTask(start, stop, vix, values));
}
CommonThreadPool.invokeAndShutdown(pool, tasks);
mergeSortedBlocks(blklen, vix, values, k);
}
catch(Exception ex) {
throw new DMLRuntimeException(ex);
}
}
//sort by secondary columns if required (in-place)
if( by.length > 1 )
sortBySecondary(0, rlen, values, vix, in, by, 1);
//flip order if descending requested (note that this needs to happen
//before we ensure stable outputs, hence we also flip values)
if(desc) {
sortReverseDense(vix);
sortReverseDense(values);
}
//final pass to ensure stable output
sortIndexesStable(0, rlen, values, vix, in, by, 1);
//step 4: create output matrix (guaranteed non-empty, see step 2)
if( !ixret ) {
out.allocateBlock();
//copy input data in sorted order into result
ExecutorService pool = CommonThreadPool.get(k);
ArrayList<CopyTask> tasks = new ArrayList<>();
ArrayList<Integer> blklen = UtilFunctions
.getBalancedBlockSizesDefault(rlen, k, false);
for( int i=0, lb=0; i<blklen.size(); lb+=blklen.get(i), i++ )
tasks.add( new CopyTask(in, out, vix, lb, lb+blklen.get(i)));
CommonThreadPool.invokeAndShutdown(pool, tasks);
}
else {
//copy sorted index vector into result
out.allocateDenseBlock(false);
DenseBlock c = out.getDenseBlock();
for( int i=0; i<rlen; i++ )
c.set(i, 0, vix[i]+1);
}
return out;
}
/**
* CP reshape operation (single input, single output matrix)
*
* NOTE: In contrast to R, the rowwise parameter specifies both
* the read and write order, with row-wise being the default, while
* R uses always a column-wise read, rowwise specifying the write
* order and column-wise being the default.
*
* @param in input matrix
* @param out output matrix
* @param rows number of rows
* @param cols number of columns
* @param rowwise if true, reshape by row
* @return output matrix
*/
public static MatrixBlock reshape( MatrixBlock in, MatrixBlock out, int rows, int cols, boolean rowwise ) {
int rlen = in.rlen;
int clen = in.clen;
//check validity
if( ((long)rlen)*clen != ((long)rows)*cols )
throw new DMLRuntimeException("Reshape matrix requires consistent numbers of input/output cells ("+rlen+":"+clen+", "+rows+":"+cols+").");
//check for same dimensions
if( rlen==rows && clen == cols ) {
//copy incl dims, nnz
if( SHALLOW_COPY_REORG )
out.copyShallow(in);
else
out.copy(in);
return out;
}
//determine output representation
out.sparse = MatrixBlock.evalSparseFormatInMemory(rows, cols, in.nonZeros);
//set output dimensions
out.rlen = rows;
out.clen = cols;
out.nonZeros = in.nonZeros;
//core reshape (sparse or dense)
if(!in.sparse && !out.sparse)
reshapeDense(in, out, rows, cols, rowwise);
else if(in.sparse && out.sparse)
reshapeSparse(in, out, rows, cols, rowwise);
else if(in.sparse)
reshapeSparseToDense(in, out, rows, cols, rowwise);
else
reshapeDenseToSparse(in, out, rows, cols, rowwise);
return out;
}
/**
* MR/SPARK reshape interface - for reshape we cannot view blocks independently, and hence,
* there are different CP and MR interfaces.
*
* @param in indexed matrix value
* @param mcIn input matrix characteristics
* @param mcOut output matrix characteristics
* @param rowwise if true, reshape by row
* @param outputEmptyBlocks output blocks with nnz=0
* @return list of indexed matrix values
*/
public static List<IndexedMatrixValue> reshape(IndexedMatrixValue in, DataCharacteristics mcIn,
DataCharacteristics mcOut, boolean rowwise, boolean outputEmptyBlocks ) {
//prepare inputs
MatrixIndexes ixIn = in.getIndexes();
MatrixBlock mbIn = (MatrixBlock) in.getValue();
//prepare result blocks (no reuse in order to guarantee mem constraints)
Collection<MatrixIndexes> rix = computeAllResultBlockIndexes(ixIn, mcIn, mcOut, mbIn, rowwise, outputEmptyBlocks);
Map<MatrixIndexes, MatrixBlock> rblk = createAllResultBlocks(rix, mbIn.nonZeros, mcOut);
//basic algorithm
long row_offset = (ixIn.getRowIndex()-1)*mcIn.getBlocksize();
long col_offset = (ixIn.getColumnIndex()-1)*mcIn.getBlocksize();
if( mbIn.sparse )
reshapeSparse(mbIn, row_offset, col_offset, rblk, mcIn, mcOut, rowwise);
else //dense
reshapeDense(mbIn, row_offset, col_offset, rblk, mcIn, mcOut, rowwise);
//prepare output (sparsity switch, wrapper)
return rblk.entrySet().stream()
.filter( e -> outputEmptyBlocks || !e.getValue().isEmptyBlock(false))
.map(e -> {e.getValue().examSparsity(); return new IndexedMatrixValue(e.getKey(),e.getValue());})
.collect(Collectors.toList());
}
/**
* CP rmempty operation (single input, single output matrix)
*
* @param in input matrix
* @param ret output matrix
* @param rows ?
* @param emptyReturn return row/column of zeros for empty input
* @param select ?
* @return matrix block
*/
public static MatrixBlock rmempty(MatrixBlock in, MatrixBlock ret, boolean rows, boolean emptyReturn, MatrixBlock select) {
//check for empty inputs
//(the semantics of removeEmpty are that for an empty m-by-n matrix, the output
//is an empty 1-by-n or m-by-1 matrix because we don't allow matrices with dims 0)
if( in.isEmptyBlock(false) && select == null ) {
int n = emptyReturn ? 1 : 0;
if( rows )
ret.reset(n, in.clen, in.sparse);
else //cols
ret.reset(in.rlen, n, in.sparse);
return ret;
}
if( rows )
return removeEmptyRows(in, ret, select, emptyReturn);
else //cols
return removeEmptyColumns(in, ret, select, emptyReturn);
}
/**
* MR rmempty interface - for rmempty we cannot view blocks independently, and hence,
* there are different CP and MR interfaces.
*
* @param data ?
* @param offset ?
* @param rmRows ?
* @param len ?
* @param blen block length
* @param outList list of indexed matrix values
*/
public static void rmempty(IndexedMatrixValue data, IndexedMatrixValue offset, boolean rmRows, long len, long blen, ArrayList<IndexedMatrixValue> outList) {
//sanity check inputs
if( !(data.getValue() instanceof MatrixBlock && offset.getValue() instanceof MatrixBlock) )
throw new DMLRuntimeException("Unsupported input data: expected "+MatrixBlock.class.getName()+" but got "+data.getValue().getClass().getName()+" and "+offset.getValue().getClass().getName());
if( rmRows && data.getValue().getNumRows()!=offset.getValue().getNumRows()
|| !rmRows && data.getValue().getNumColumns()!=offset.getValue().getNumColumns() ){
throw new DMLRuntimeException("Dimension mismatch between input data and offsets: ["
+data.getValue().getNumRows()+"x"+data.getValue().getNumColumns()+" vs "+offset.getValue().getNumRows()+"x"+offset.getValue().getNumColumns());
}
//compute outputs (at most two output blocks)
HashMap<MatrixIndexes,IndexedMatrixValue> out = new HashMap<>();
MatrixBlock linData = (MatrixBlock) data.getValue();
MatrixBlock linOffset = (MatrixBlock) offset.getValue();
MatrixIndexes tmpIx = new MatrixIndexes(-1,-1);
if( rmRows ) //margin = "rows"
{
long rlen = len;
long clen = linData.getNumColumns();
for( int i=0; i<linOffset.getNumRows(); i++ ) {
long rix = (long)linOffset.quickGetValue(i, 0);
if( rix > 0 ) //otherwise empty row
{
//get single row from source block
MatrixBlock src = linData.slice(i, i, 0, (int)(clen-1), new MatrixBlock());
long brix = (rix-1)/blen+1;
long lbrix = (rix-1)%blen;
tmpIx.setIndexes(brix, data.getIndexes().getColumnIndex());
//create target block if necessary
if( !out.containsKey(tmpIx) ) {
IndexedMatrixValue tmpIMV = new IndexedMatrixValue(new MatrixIndexes(),new MatrixBlock());
tmpIMV.getIndexes().setIndexes(tmpIx);
((MatrixBlock)tmpIMV.getValue()).reset((int)Math.min(blen, rlen-((brix-1)*blen)), (int)clen);
out.put(tmpIMV.getIndexes(), tmpIMV);
}
//put single row into target block
((MatrixBlock)out.get(tmpIx).getValue()).copy(
(int)lbrix, (int)lbrix, 0, (int)clen-1, src, false);
}
}
}
else //margin = "cols"
{
long rlen = linData.getNumRows();
long clen = len;
for( int i=0; i<linOffset.getNumColumns(); i++ ) {
long cix = (long)linOffset.quickGetValue(0, i);
if( cix > 0 ) //otherwise empty row
{
//get single row from source block
MatrixBlock src = linData.slice(0, (int)(rlen-1), i, i, new MatrixBlock());
long bcix = (cix-1)/blen+1;
long lbcix = (cix-1)%blen;
tmpIx.setIndexes(data.getIndexes().getRowIndex(), bcix);
//create target block if necessary
if( !out.containsKey(tmpIx) ) {
IndexedMatrixValue tmpIMV = new IndexedMatrixValue(new MatrixIndexes(),new MatrixBlock());
tmpIMV.getIndexes().setIndexes(tmpIx);
((MatrixBlock)tmpIMV.getValue()).reset((int)rlen,(int)Math.min(blen, clen-((bcix-1)*blen)));
out.put(tmpIMV.getIndexes(), tmpIMV);
}
//put single row into target block
((MatrixBlock)out.get(tmpIx).getValue()).copy(
0, (int)rlen-1, (int)lbcix, (int)lbcix, src, false);
}
}
}
//prepare and return outputs (already in cached values)
for( IndexedMatrixValue imv : out.values() ){
((MatrixBlock)imv.getValue()).recomputeNonZeros();
outList.add(imv);
}
}
/**
* CP rexpand operation (single input, single output)
*
* @param in input matrix
* @param ret output matrix
* @param max ?
* @param rows ?
* @param cast ?
* @param ignore ?
* @param k degree of parallelism
* @return output matrix
*/
public static MatrixBlock rexpand(MatrixBlock in, MatrixBlock ret, double max, boolean rows, boolean cast, boolean ignore, int k) {
//prepare parameters
int lmax = (int)UtilFunctions.toLong(max);
//sanity check for input nnz (incl implicit handling of empty blocks)
if( !ignore && in.getNonZeros()<in.getNumRows() )
throw new DMLRuntimeException("Invalid input w/ zeros for rexpand ignore=false "
+ "(rlen="+in.getNumRows()+", nnz="+in.getNonZeros()+").");
//check for empty inputs (for ignore=true)
if( in.isEmptyBlock(false) ) {
if( rows )
ret.reset(lmax, in.rlen, true);
else //cols
ret.reset(in.rlen, lmax, true);
return ret;
}
//execute rexpand operations
if( rows )
return rexpandRows(in, ret, lmax, cast, ignore);
else //cols
return rexpandColumns(in, ret, lmax, cast, ignore, k);
}
/**
* MR/Spark rexpand operation (single input, multiple outputs incl empty blocks)
*
* @param data indexed matrix value
* @param max ?
* @param rows ?
* @param cast ?
* @param ignore ?
* @param blen block length
* @param outList list of indexed matrix values
*/
public static void rexpand(IndexedMatrixValue data, double max, boolean rows, boolean cast, boolean ignore, long blen, ArrayList<IndexedMatrixValue> outList) {
//prepare parameters
MatrixIndexes ix = data.getIndexes();
MatrixBlock in = (MatrixBlock)data.getValue();
//execute rexpand operations incl sanity checks
//TODO more robust (memory efficient) implementation w/o tmp block
MatrixBlock tmp = rexpand(in, new MatrixBlock(), max, rows, cast, ignore, 1);
//prepare outputs blocks (slice tmp block into output blocks )
if( rows ) //expanded vertically
{
for( int rl=0; rl<tmp.getNumRows(); rl+=blen ) {
MatrixBlock mb = tmp.slice(
rl, (int)(Math.min(rl+blen, tmp.getNumRows())-1));
outList.add(new IndexedMatrixValue(
new MatrixIndexes(rl/blen+1, ix.getRowIndex()), mb));
}
}
else //expanded horizontally
{
for( int cl=0; cl<tmp.getNumColumns(); cl+=blen ) {
MatrixBlock mb = tmp.slice(
0, tmp.getNumRows()-1,
cl, (int)(Math.min(cl+blen, tmp.getNumColumns())-1), new MatrixBlock());
outList.add(new IndexedMatrixValue(
new MatrixIndexes(ix.getRowIndex(), cl/blen+1), mb));
}
}
}
///////////////////////////////
// private CP implementation //
///////////////////////////////
private static ReorgType getReorgType( ReorgOperator op )
{
if( op.fn instanceof SwapIndex ) //transpose
return ReorgType.TRANSPOSE;
else if( op.fn instanceof RevIndex ) //rev
return ReorgType.REV;
else if( op.fn instanceof DiagIndex ) //diag
return ReorgType.DIAG;
else if( op.fn instanceof SortIndex ) //sort
return ReorgType.SORT;
return ReorgType.INVALID;
}
private static void transposeDenseToDense(MatrixBlock in, MatrixBlock out, int rl, int ru, int cl, int cu) {
final int m = in.rlen;
final int n = in.clen;
final int n2 = out.clen;
DenseBlock a = in.getDenseBlock();
DenseBlock c = out.getDenseBlock();
if( m==1 || n==1 ) //VECTOR TRANSPOSE
{
//plain memcopy, in case shallow dense copy no applied
//input/output guaranteed single block
int ix = rl+cl; int len = ru+cu-ix-1;
System.arraycopy(a.valuesAt(0), ix, c.valuesAt(0), ix, len);
}
else //MATRIX TRANSPOSE
{
//blocking according to typical L2 cache sizes
final int blocksizeI = 128;
final int blocksizeJ = 128;
//blocked execution
if( a.numBlocks()==1 && c.numBlocks()==1 ) { //<16GB
double[] avals = a.valuesAt(0);
double[] cvals = c.valuesAt(0);
for( int bi = rl; bi<ru; bi+=blocksizeI ) {
int bimin = Math.min(bi+blocksizeI, ru);
for( int bj = cl; bj<cu; bj+=blocksizeJ ) {
int bjmin = Math.min(bj+blocksizeJ, cu);
//core transpose operation
for( int i=bi; i<bimin; i++ ) {
int aix = i * n + bj;
int cix = bj * n2 + i;
transposeRow(avals, cvals, aix, cix, n2, bjmin-bj);
}
}
}
}
else { //general case > 16GB (multiple blocks)
for( int bi = rl; bi<ru; bi+=blocksizeI ) {
int bimin = Math.min(bi+blocksizeI, ru);
for( int bj = cl; bj<cu; bj+=blocksizeJ ) {
int bjmin = Math.min(bj+blocksizeJ, cu);
//core transpose operation
for( int i=bi; i<bimin; i++ ) {
double[] avals = a.values(i);
int aix = a.pos(i);
for( int j=bj; j<bjmin; j++ )
c.set(j, i, avals[ aix+j ]);
}
}
}
}
}
}
private static void transposeDenseToSparse(MatrixBlock in, MatrixBlock out)
{
//NOTE: called only in sequential execution
final int m = in.rlen;
final int n = in.clen;
final int m2 = out.rlen;
final int n2 = out.clen;
final int ennz2 = (int) (in.nonZeros/m2);
DenseBlock a = in.getDenseBlock();
SparseBlock c = out.getSparseBlock();
if( out.rlen == 1 ) //VECTOR-VECTOR
{
c.allocate(0, (int)in.nonZeros);
c.setIndexRange(0, 0, m, a.valuesAt(0), 0, m);
}
else //general case: MATRIX-MATRIX
{
//blocking according to typical L2 cache sizes
final int blocksizeI = 128;
final int blocksizeJ = 128;
//blocked execution
for( int bi = 0; bi<m; bi+=blocksizeI ) {
int bimin = Math.min(bi+blocksizeI, m);
for( int bj = 0; bj<n; bj+=blocksizeJ ) {
int bjmin = Math.min(bj+blocksizeJ, n);
//core transpose operation
for( int i=bi; i<bimin; i++ ) {
double[] avals = a.values(i);
int aix = a.pos(i);
for( int j=bj; j<bjmin; j++ ) {
c.allocate(j, ennz2, n2);
c.append(j, i, avals[aix+j]);
}
}
}
}
}
}
private static void transposeSparseToSparse(MatrixBlock in, MatrixBlock out, int rl, int ru, int cl, int cu, int[] cnt)
{
//NOTE: called only in sequential or column-wise parallel execution
if( rl > 0 || ru < in.rlen )
throw new RuntimeException("Unsupported row-parallel transposeSparseToSparse: "+rl+", "+ru);
final int m2 = out.rlen;
final int n2 = out.clen;
final int ennz2 = (int) (in.nonZeros/m2);
SparseBlock a = in.getSparseBlock();
SparseBlock c = out.getSparseBlock();
//allocate output sparse rows
if( cnt != null ) {
for( int i=cl; i<cu; i++ )
if( cnt[i] > 0 )
c.allocate(i, cnt[i]);
}
//blocking according to typical L2 cache sizes w/ awareness of sparsity
final long xsp = (long)in.rlen*in.clen/in.nonZeros;
final int blocksizeI = Math.max(128, (int) (8*xsp));
final int blocksizeJ = Math.max(128, (int) (8*xsp));
//temporary array for block boundaries (for preventing binary search)
int[] ix = new int[Math.min(blocksizeI, ru-rl)];
//blocked execution
for( int bi=rl; bi<ru; bi+=blocksizeI )
{
Arrays.fill(ix, 0);
//find column starting positions
int bimin = Math.min(bi+blocksizeI, ru);
if( cl > 0 ) {
for( int i=bi; i<bimin; i++ ) {
if( a.isEmpty(i) ) continue;
int j = a.posFIndexGTE(i, cl);
ix[i-bi] = (j>=0) ? j : a.size(i);
}
}
for( int bj=cl; bj<cu; bj+=blocksizeJ ) {
int bjmin = Math.min(bj+blocksizeJ, cu);
//core block transpose operation
for( int i=bi; i<bimin; i++ ) {
if( a.isEmpty(i) ) continue;
int apos = a.pos(i);
int alen = a.size(i);
int[] aix = a.indexes(i);
double[] avals = a.values(i);
int j = ix[i-bi] + apos; //last block boundary
for( ; j<apos+alen && aix[j]<bjmin; j++ ) {
c.allocate(aix[j], ennz2, n2);
c.append(aix[j], i, avals[j]);
}
ix[i-bi] = j - apos; //keep block boundary
}
}
}
}
private static void transposeSparseToDense(MatrixBlock in, MatrixBlock out, int rl, int ru, int cl, int cu) {
final int m = in.rlen;
final int n = in.clen;
SparseBlock a = in.getSparseBlock();
DenseBlock c = out.getDenseBlock();
if( m==1 ) //ROW VECTOR TRANSPOSE
{
//NOTE: called only in sequential execution
int alen = a.size(0); //always pos 0
int[] aix = a.indexes(0);
double[] avals = a.values(0);
double[] cvals = c.valuesAt(0);
for( int j=0; j<alen; j++ )
cvals[aix[j]] = avals[j];
}
else //MATRIX TRANSPOSE
{
//blocking according to typical L2 cache sizes
final int blocksizeI = 128;
final int blocksizeJ = 128;
//temporary array for block boundaries (for preventing binary search)
int[] ix = new int[blocksizeI];
//blocked execution
for( int bi = rl; bi<ru; bi+=blocksizeI ) {
Arrays.fill(ix, 0);
int bimin = Math.min(bi+blocksizeI, ru);
for( int bj = 0; bj<n; bj+=blocksizeJ ) {
int bjmin = Math.min(bj+blocksizeJ, n);
//core transpose operation
for( int i=bi, iix=0; i<bimin; i++, iix++ ) {
if( a.isEmpty(i) ) continue;
int apos = a.pos(i);
int alen = a.size(i);
int[] aix = a.indexes(i);
double[] avals = a.values(i);
int j = ix[iix]; //last block boundary
for( ; j<alen && aix[apos+j]<bjmin; j++ )
c.set(aix[apos+j], i, avals[apos+j]);
ix[iix] = j; //keep block boundary
}
}
}
}
}
static void transposeRow( double[] a, double[] c, int aix, int cix, int n2, int len ) {
final int bn = len%8;
//compute rest (not aligned to 8-blocks)
for( int j=0; j<bn; j++, aix++, cix+=n2 )
c[ cix ] = a[ aix+0 ];
//unrolled 8-blocks
for( int j=bn; j<len; j+=8, aix+=8, cix+=8*n2 ) {
c[ cix + 0*n2 ] = a[ aix+0 ];
c[ cix + 1*n2 ] = a[ aix+1 ];
c[ cix + 2*n2 ] = a[ aix+2 ];
c[ cix + 3*n2 ] = a[ aix+3 ];
c[ cix + 4*n2 ] = a[ aix+4 ];
c[ cix + 5*n2 ] = a[ aix+5 ];
c[ cix + 6*n2 ] = a[ aix+6 ];
c[ cix + 7*n2 ] = a[ aix+7 ];
}
}
private static int[] countNnzPerColumn(MatrixBlock in, int rl, int ru) {
//initial pass to determine capacity (this helps to prevent
//sparse row reallocations and mem inefficiency w/ skew
int[] cnt = null;
if( in.sparse && in.clen <= 4096 ) { //16KB
SparseBlock a = in.sparseBlock;
cnt = new int[in.clen];
for( int i=rl; i<ru; i++ ) {
if( !a.isEmpty(i) )
countAgg(cnt, a.indexes(i), a.pos(i), a.size(i));
}
}
return cnt;
}
private static int[] mergeNnzCounts(int[] cnt, int[] cnt2) {
if( cnt == null )
return cnt2;
for( int i=0; i<cnt.length; i++ )
cnt[i] += cnt2[i];
return cnt;
}
private static void reverseDense(MatrixBlock in, MatrixBlock out) {
final int m = in.rlen;
final int n = in.clen;
//set basic meta data and allocate output
out.sparse = false;
out.nonZeros = in.nonZeros;
out.allocateDenseBlock(false);
//copy all rows into target positions
if( n == 1 ) { //column vector
double[] a = in.getDenseBlockValues();
double[] c = out.getDenseBlockValues();
for( int i=0; i<m; i++ )
c[m-1-i] = a[i];
}
else { //general matrix case
DenseBlock a = in.getDenseBlock();
DenseBlock c = out.getDenseBlock();
for( int i=0; i<m; i++ ) {
final int ri = m - 1 - i;
System.arraycopy(a.values(i), a.pos(i), c.values(ri), c.pos(ri), n);
}
}
}
private static void reverseSparse(MatrixBlock in, MatrixBlock out) {
final int m = in.rlen;
//set basic meta data and allocate output
out.sparse = true;
out.nonZeros = in.nonZeros;
out.allocateSparseRowsBlock(false);
//copy all rows into target positions
SparseBlock a = in.getSparseBlock();
SparseBlock c = out.getSparseBlock();
for( int i=0; i<m; i++ )
if( !a.isEmpty(i) )
c.set(m-1-i, a.get(i), true);
}
/**
* Generic implementation diagV2M
* (in most-likely DENSE, out most likely SPARSE)
*
* @param in input matrix
* @param out output matrix
*/
private static void diagV2M( MatrixBlock in, MatrixBlock out )
{
final int rlen = in.rlen;
//CASE column vector
if( out.sparse ) { //SPARSE
if( SPARSE_OUTPUTS_IN_CSR ) {
int[] rptr = new int[in.rlen+1];
int[] cix = null;
double[] vals = null;
//case a: fully dense vector
if( rlen == in.nonZeros && !in.sparse ) {
//reuse single seq for rptr and cix (cix truncated by 1)
rptr = cix = UtilFunctions.getSeqArray(0, rlen, 1);
vals = in.getDenseBlockValues(); //shallow copy
}
//case b: more general input
else {
cix = new int[(int)in.nonZeros];
vals = new double[(int)in.nonZeros];
for( int i=0, pos=0; i<rlen; i++ ) {
double val = in.quickGetValue(i, 0);
if( val != 0 ) {
cix[pos] = i;
vals[pos] = val;
pos++;
}
rptr[i+1]=pos;
}
}
out.sparseBlock = new SparseBlockCSR(
rptr, cix, vals, (int)in.nonZeros);
}
else {
out.allocateBlock();
SparseBlock sblock = out.sparseBlock;
for(int i=0; i<rlen; i++) {
double val = in.quickGetValue(i, 0);
if( val != 0 ) {
sblock.allocate(i, 1);
sblock.append(i, i, val);
}
}
}
}
else { //DENSE
for( int i=0; i<rlen; i++ ) {
double val = in.quickGetValue(i, 0);
if( val != 0 )
out.appendValue(i, i, val);
}
}
out.setNonZeros(in.nonZeros);
}
/**
* Generic implementation diagM2V (non-performance critical)
* (in most-likely SPARSE, out most likely DENSE)
*
* NOTE: squared block assumption (checked on entry diag)
*
* @param in input matrix
* @param out output matrix
*/
private static void diagM2V( MatrixBlock in, MatrixBlock out )
{
DenseBlock c = out.allocateBlock().getDenseBlock();
int rlen = in.rlen;
int nnz = 0;
for( int i=0; i<rlen; i++ ) {
double val = in.quickGetValue(i, i);
if( val != 0 ) {
c.set(i, 0, val);
nnz++;
}
}
out.setNonZeros(nnz);
}
private static void reshapeDense( MatrixBlock in, MatrixBlock out, int rows, int cols, boolean rowwise ) {
int rlen = in.rlen;
int clen = in.clen;
//reshape empty block
if( in.denseBlock == null )
return;
//shallow dense by-row reshape (w/o result allocation)
if( SHALLOW_COPY_REORG && rowwise && in.denseBlock.numBlocks()==1 ) {
//since the physical representation of dense matrices is always the same,
//we don't need to create a copy, given our copy on write semantics.
//however, note that with update in-place this would be an invalid optimization
out.denseBlock = DenseBlockFactory.createDenseBlock(in.getDenseBlockValues(), rows, cols);
return;
}
//allocate block if necessary
out.allocateDenseBlock(false);
//dense reshape
DenseBlock a = in.getDenseBlock();
DenseBlock c = out.getDenseBlock();
if( rowwise ) {
//VECTOR-MATRIX, MATRIX-VECTOR, GENERAL CASE
//pure copy of rowwise internal representation
c.set(a);
}
else { //colwise
if( rlen==1 || clen==1 ) { //VECTOR->MATRIX
//note: cache-friendly on a but not on c
double[] avals = a.valuesAt(0);
double[] cvals = c.valuesAt(0);
for( int j=0, aix=0; j<cols; j++ )
for( int i=0, cix=0; i<rows; i++, cix+=cols )
cvals[ cix + j ] = avals[ aix++ ];
}
else if( rows==1 || cols==1 ) //MATRIX->VECTOR
{
//note: cache-friendly on c but not on a
double[] avals = a.valuesAt(0);
double[] cvals = c.valuesAt(0);
for( int j=0, cix=0; j<clen; j++ )
for( int i=0, aix=0; i<rlen; i++, aix+=clen )
cvals[ cix++ ] = avals[ aix + j ];
}
else //GENERAL CASE: MATRIX->MATRIX
{
//note: cache-friendly on c but not an a
for( int i=0; i<rows; i++ ) {
double[] cvals = c.values(i);
int cix = c.pos(i);
for( int j=0, aix2=i; j<cols; j++, aix2+=rows ) {
int ai = aix2%rlen;
int aj = aix2/rlen;
cvals[cix+j] = a.get(ai,aj);
}
}
//index conversion c[i,j]<- a[k,l]:
// k = (rows*j+i)%rlen
// l = (rows*j+i)/rlen
}
}
}
private static void reshapeSparse( MatrixBlock in, MatrixBlock out, int rows, int cols, boolean rowwise )
{
int rlen = in.rlen;
int clen = in.clen;
//reshape empty block
if( in.isEmptyBlock(false) )
return;
//allocate block if necessary
out.allocateSparseRowsBlock(false);
int estnnz = (int) (in.nonZeros/rows);
//sparse reshape
SparseBlock a = in.sparseBlock;
SparseBlock c = out.sparseBlock;
if( rowwise )
{
//NOTES on special cases
// * vector-matrix not really different from general
// * clen=1 and cols=1 will never be sparse.
if( rows==1 ) //MATRIX->VECTOR
{
//note: cache-friendly on a and c; append-only
c.allocate(0, estnnz, cols);
for( int i=0, cix=0; i<rlen; i++, cix+=clen ) {
if( a.isEmpty(i) ) continue;
int apos = a.pos(i);
int alen = a.size(i);
int[] aix = a.indexes(i);
double[] avals = a.values(i);
for( int j=apos; j<apos+alen; j++ )
c.append(0, cix+aix[j], avals[j]);
}
}
else if( cols%clen==0 //SPECIAL CSR N:1 MATRIX->MATRIX
&& SHALLOW_COPY_REORG && SPARSE_OUTPUTS_IN_CSR
&& a instanceof SparseBlockCSR ) { //int nnz
int[] aix = ((SparseBlockCSR)a).indexes();
int n = cols/clen, pos = 0;
int[] rptr = new int[rows+1];
int[] indexes = new int[(int)a.size()];
rptr[0] = 0;
for(int bi=0, ci=0; bi<rlen; bi+=n, ci++) {
for( int i=bi, cix=0; i<bi+n; i++, cix+=clen ) {
if(a.isEmpty(i)) continue;
int apos = a.pos(i);
int alen = a.size(i);
for( int j=apos; j<apos+alen; j++ )
indexes[pos++] = cix+aix[j];
}
rptr[ci+1] = pos;
}
//create CSR block with shallow copy of values
out.sparseBlock = new SparseBlockCSR(rptr, indexes,
((SparseBlockCSR)a).values(), pos);
}
else if( cols%clen==0 ) { //SPECIAL N:1 MATRIX->MATRIX
int n = cols/clen;
for(int bi=0, ci=0; bi<rlen; bi+=n, ci++) {
//allocate output row once (w/o re-allocations)
long lnnz = a.size(bi, bi+n);
c.allocate(ci, (int)lnnz);
//copy N input rows into output row
for( int i=bi, cix=0; i<bi+n; i++, cix+=clen ) {
if(a.isEmpty(i)) continue;
int apos = a.pos(i);
int alen = a.size(i);
int[] aix = a.indexes(i);
double[] avals = a.values(i);
for( int j=apos; j<apos+alen; j++ )
c.append(ci, cix+aix[j], avals[j]);
}
}
}
else //GENERAL CASE: MATRIX->MATRIX
{
//note: cache-friendly on a but not c; append-only
//long cix because total cells in sparse can be larger than int
long cix = 0;
for( int i=0; i<rlen; i++ ) {
if( !a.isEmpty(i) ){
int apos = a.pos(i);
int alen = a.size(i);
int[] aix = a.indexes(i);
double[] avals = a.values(i);
for( int j=apos; j<apos+alen; j++ ) {
int ci = (int)((cix+aix[j])/cols);
int cj = (int)((cix+aix[j])%cols);
c.allocate(ci, estnnz, cols);
c.append(ci, cj, avals[j]);
}
}
cix += clen;
}
}
}
else //colwise
{
//NOTES on special cases
// * matrix-vector not really different from general
// * clen=1 and cols=1 will never be sparse.
if( rlen==1 ) //VECTOR->MATRIX
{
//note: cache-friendly on a but not c; append-only
if( !a.isEmpty(0) ){
int alen = a.size(0); //always pos 0
int[] aix = a.indexes(0);
double[] avals = a.values(0);
for( int j=0; j<alen; j++ ) {
int ci = aix[j]%rows;
int cj = aix[j]/rows;
c.allocate(ci, estnnz, cols);
c.append(ci, cj, avals[j]);
}
}
}
else //GENERAL CASE: MATRIX->MATRIX
{
//note: cache-friendly on a but not c; append&sort, in-place w/o shifts
for( int i=0; i<rlen; i++ ) {
if( a.isEmpty(i) ) continue;
int apos = a.pos(i);
int alen = a.size(i);
int[] aix = a.indexes(i);
double[] avals = a.values(i);
for( int j=apos; j<apos+alen; j++ ) {
//long tmpix because total cells in sparse can be larger than int
long tmpix = (long)aix[j]*rlen+i;
int ci = (int)(tmpix%rows);
int cj = (int)(tmpix/rows);
c.allocate(ci, estnnz, cols);
c.append(ci, cj, avals[j]);
}
}
out.sortSparseRows();
}
}
}
private static void reshapeDenseToSparse( MatrixBlock in, MatrixBlock out, int rows, int cols, boolean rowwise )
{
int rlen = in.rlen;
int clen = in.clen;
//reshape empty block
if( in.denseBlock == null )
return;
//allocate block if necessary
out.allocateSparseRowsBlock(false);
int estnnz = (int) (in.nonZeros/rows);
//sparse reshape
DenseBlock a = in.getDenseBlock();
SparseBlock c = out.sparseBlock;
if( rowwise ) {
//NOTES on special cases
// * vector-matrix, matrix-vector not really different from general
//GENERAL CASE: MATRIX->MATRIX
//note: cache-friendly on a and c; append-only
for( int i=0; i<rlen; i++ ) {
double[] avals = a.values(i);
int aix = a.pos(i);
for( int j=0; j<clen; j++ ) {
double val = avals[aix+j];
if( val != 0 ) {
long cix = (long) i*clen+j;
int ci = (int)(cix / cols);
int cj = (int)(cix % cols);
c.allocate(ci, estnnz, cols);
c.append(ci, cj, val);
}
}
}
}
else //colwise
{
//NOTES on special cases
// * matrix-vector not really different from general
if( rlen==1 ) //VECTOR->MATRIX
{
//note: cache-friendly on a but not c; append-only
double[] avals = a.valuesAt(0);
for( int j=0, aix=0; j<cols; j++ )
for( int i=0; i<rows; i++ ) {
double val = avals[aix++];
if( val != 0 ) {
c.allocate(i, estnnz, cols);
c.append(i, j, val);
}
}
}
else //GENERAL CASE: MATRIX->MATRIX
{
//note: cache-friendly on c but not a; append-only
for( int i=0; i<rows; i++ )
for( int j=0, aix2=i; j<cols; j++, aix2+=rows ) {
int ai = aix2%rlen;
int aj = aix2/rlen;
double val = a.get(ai, aj);
if( val != 0 ) {
c.allocate(i, estnnz, cols);
c.append(i, j, val);
}
}
}
}
}
private static void reshapeSparseToDense( MatrixBlock in, MatrixBlock out, int rows, int cols, boolean rowwise ) {
int rlen = in.rlen;
int clen = in.clen;
//reshape empty block
if( in.sparseBlock == null )
return;
//allocate block if necessary
out.allocateDenseBlock(false);
//sparse/dense reshape
SparseBlock a = in.sparseBlock;
DenseBlock c = out.getDenseBlock();
if( rowwise )
{
//NOTES on special cases
// * vector-matrix, matrix-vector not really different from general
//GENERAL CASE: MATRIX->MATRIX
//note: cache-friendly on a and c
for( int i=0, cix=0; i<rlen; i++, cix+=clen ) {
if( !a.isEmpty(i) ) {
int apos = a.pos(i);
int alen = a.size(i);
int[] aix = a.indexes(i);
double[] avals = a.values(i);
for( int j=apos; j<apos+alen; j++ ) {
int ci = (cix+aix[j]) / cols;
int cj = (cix+aix[j]) % cols;
c.set(ci, cj, avals[j]);
}
}
}
}
else //colwise
{
//NOTES on special cases
// * matrix-vector not really different from general
if( rlen==1 ) //VECTOR->MATRIX
{
//note: cache-friendly on a but not c
double[] cvals = c.valuesAt(0);
if( !a.isEmpty(0) ) {
int apos = a.pos(0);
int alen = a.size(0);
int[] aix = a.indexes(0);
double[] avals = a.values(0);
for( int j=apos; j<apos+alen; j++ ) {
int ci = aix[j]%rows;
int cj = aix[j]/rows;
cvals[ci*cols+cj] = avals[j];
}
}
}
else //GENERAL CASE: MATRIX->MATRIX
{
//note: cache-friendly on a but not c
for( int i=0; i<rlen; i++ ) {
if( a.isEmpty(i) ) continue;
int apos = a.pos(i);
int alen = a.size(i);
int[] aix = a.indexes(i);
double[] avals = a.values(i);
for( int j=apos; j<apos+alen; j++ ) {
int tmpix = aix[j]*rlen+i;
int ci = tmpix%rows;
int cj = tmpix/rows;
c.set(ci, cj, avals[j]);
}
}
}
}
}
///////////////////////////////
// private MR implementation //
///////////////////////////////
private static Collection<MatrixIndexes> computeAllResultBlockIndexes(MatrixIndexes ixin,
DataCharacteristics mcIn, DataCharacteristics mcOut, MatrixBlock in, boolean rowwise, boolean outputEmpty )
{
HashSet<MatrixIndexes> ret = new HashSet<>();
long row_offset = (ixin.getRowIndex()-1)*mcOut.getBlocksize();
long col_offset = (ixin.getColumnIndex()-1)*mcOut.getBlocksize();
long max_row_offset = Math.min(mcIn.getRows(),row_offset+mcIn.getBlocksize())-1;
long max_col_offset = Math.min(mcIn.getCols(),col_offset+mcIn.getBlocksize())-1;
if( rowwise ) {
if( mcIn.getCols() == 1 ) {
MatrixIndexes first = computeResultBlockIndex(new MatrixIndexes(), row_offset, 0, mcIn, mcOut, rowwise);
MatrixIndexes last = computeResultBlockIndex(new MatrixIndexes(), max_row_offset, 0, mcIn, mcOut, rowwise);
createRowwiseIndexes(first, last, mcOut.getNumColBlocks(), ret);
}
else if( in.getNonZeros()<in.getNumRows() && !outputEmpty ) {
createNonZeroIndexes(mcIn, mcOut, in, row_offset, col_offset, rowwise, ret);
}
else {
for( long i=row_offset; i<max_row_offset+1; i++ ) {
MatrixIndexes first = computeResultBlockIndex(new MatrixIndexes(), i, col_offset, mcIn, mcOut, rowwise);
MatrixIndexes last = computeResultBlockIndex(new MatrixIndexes(), i, max_col_offset, mcIn, mcOut, rowwise);
createRowwiseIndexes(first, last, mcOut.getNumColBlocks(), ret);
}
}
}
else{ //colwise
if( mcIn.getRows() == 1 ) {
MatrixIndexes first = computeResultBlockIndex(new MatrixIndexes(), 0, col_offset, mcIn, mcOut, rowwise);
MatrixIndexes last = computeResultBlockIndex(new MatrixIndexes(), 0, max_col_offset, mcIn, mcOut, rowwise);
createColwiseIndexes(first, last, mcOut.getNumRowBlocks(), ret);
}
else if( in.getNonZeros()<in.getNumColumns() && !outputEmpty ) {
createNonZeroIndexes(mcIn, mcOut, in, row_offset, col_offset, rowwise, ret);
}
else {
for( long j=col_offset; j<max_col_offset+1; j++ ) {
MatrixIndexes first = computeResultBlockIndex(new MatrixIndexes(), row_offset, j, mcIn, mcOut, rowwise);
MatrixIndexes last = computeResultBlockIndex(new MatrixIndexes(), max_row_offset, j, mcIn, mcOut, rowwise);
createColwiseIndexes(first, last, mcOut.getNumRowBlocks(), ret);
}
}
}
return ret;
}
private static void createRowwiseIndexes(MatrixIndexes first, MatrixIndexes last, long ncblks, HashSet<MatrixIndexes> ret) {
if( first.getRowIndex()<=0 || first.getColumnIndex()<=0 )
throw new RuntimeException("Invalid computed first index: "+first.toString());
if( last.getRowIndex()<=0 || last.getColumnIndex()<=0 )
throw new RuntimeException("Invalid computed last index: "+last.toString());
//add first row block
ret.add(first);
//add blocks in between first and last
if( !first.equals(last) ) {
boolean fill = first.getRowIndex()==last.getRowIndex()
&& first.getColumnIndex() > last.getColumnIndex();
for( long k1=first.getRowIndex(); k1<=last.getRowIndex(); k1++ ) {
long k2_start = (k1==first.getRowIndex() ? first.getColumnIndex()+1 : 1);
long k2_end = (k1==last.getRowIndex() && !fill) ? last.getColumnIndex()-1 : ncblks;
for( long k2=k2_start; k2<=k2_end; k2++ )
ret.add(new MatrixIndexes(k1,k2));
}
ret.add(last);
}
}
private static void createColwiseIndexes(MatrixIndexes first, MatrixIndexes last, long nrblks, HashSet<MatrixIndexes> ret) {
if( first.getRowIndex()<=0 || first.getColumnIndex()<=0 )
throw new RuntimeException("Invalid computed first index: "+first.toString());
if( last.getRowIndex()<=0 || last.getColumnIndex()<=0 )
throw new RuntimeException("Invalid computed last index: "+last.toString());
//add first row block
ret.add(first);
//add blocks in between first and last
if( !first.equals(last) ) {
boolean fill = first.getColumnIndex()==last.getColumnIndex()
&& first.getRowIndex() > last.getRowIndex();
for( long k1=first.getColumnIndex(); k1<=last.getColumnIndex(); k1++ ) {
long k2_start = ((k1==first.getColumnIndex()) ? first.getRowIndex()+1 : 1);
long k2_end = ((k1==last.getColumnIndex() && !fill) ? last.getRowIndex()-1 : nrblks);
for( long k2=k2_start; k2<=k2_end; k2++ )
ret.add(new MatrixIndexes(k2,k1));
}
ret.add(last);
}
}
private static void createNonZeroIndexes(DataCharacteristics mcIn, DataCharacteristics mcOut,
MatrixBlock in, long row_offset, long col_offset, boolean rowwise, HashSet<MatrixIndexes> ret) {
Iterator<IJV> iter = in.getSparseBlockIterator();
while( iter.hasNext() ) {
IJV cell = iter.next();
ret.add(computeResultBlockIndex(new MatrixIndexes(),
row_offset+cell.getI(), col_offset+cell.getJ(), mcIn, mcOut, rowwise));
}
}
private static Map<MatrixIndexes, MatrixBlock> createAllResultBlocks(Collection<MatrixIndexes> rix, long nnz, DataCharacteristics mcOut) {
return rix.stream().collect(Collectors.toMap(ix -> ix, ix -> createResultBlock(ix, nnz, rix.size(), mcOut)));
}
private static MatrixBlock createResultBlock(MatrixIndexes ix, long nnz, int nBlocks, DataCharacteristics mcOut) {
//compute indexes
long bi = ix.getRowIndex();
long bj = ix.getColumnIndex();
int lbrlen = UtilFunctions.computeBlockSize(mcOut.getRows(), bi, mcOut.getBlocksize());
int lbclen = UtilFunctions.computeBlockSize(mcOut.getCols(), bj, mcOut.getBlocksize());
if( lbrlen<1 || lbclen<1 )
throw new DMLRuntimeException("Computed block dimensions ("+bi+","+bj+" -> "+lbrlen+","+lbclen+") are invalid!");
//create result block
int estnnz = (int) (nnz/nBlocks); //force initial capacity per row to 1, for many blocks
boolean sparse = MatrixBlock.evalSparseFormatInMemory(lbrlen, lbclen, estnnz);
return new MatrixBlock(lbrlen, lbclen, sparse, estnnz);
}
private static void reshapeDense(MatrixBlock in, long row_offset, long col_offset, Map<MatrixIndexes,MatrixBlock> rix,
DataCharacteristics mcIn, DataCharacteristics mcOut, boolean rowwise ) {
if( in.isEmptyBlock(false) )
return;
int rlen = in.rlen;
int clen = in.clen;
double[] a = in.getDenseBlockValues();
//append all values to right blocks
MatrixIndexes ixtmp = new MatrixIndexes();
for( int i=0, aix=0; i<rlen; i++, aix+=clen )
{
long ai = row_offset+i;
for( int j=0; j<clen; j++ )
{
double val = a[ aix+j ];
if( val !=0 ) {
long aj = col_offset+j;
ixtmp = computeResultBlockIndex(ixtmp, ai, aj, mcIn, mcOut, rowwise);
MatrixBlock out = rix.get(ixtmp);
if( out == null )
throw new DMLRuntimeException("Missing result block: "+ixtmp);
ixtmp = computeInBlockIndex(ixtmp, ai, aj, mcIn, mcOut, rowwise);
out.appendValue((int)ixtmp.getRowIndex(),(int)ixtmp.getColumnIndex(), val);
}
}
}
//cleanup for sparse blocks
if( !rowwise && mcIn.getRows() > 1 ) {
rix.values().stream().filter(b -> b.sparse)
.forEach(b -> b.sortSparseRows());
}
}
private static void reshapeSparse(MatrixBlock in, long row_offset, long col_offset, Map<MatrixIndexes,MatrixBlock> rix,
DataCharacteristics mcIn, DataCharacteristics mcOut, boolean rowwise ) {
if( in.isEmptyBlock(false) )
return;
int rlen = in.rlen;
SparseBlock a = in.sparseBlock;
//append all values to right blocks
MatrixIndexes ixtmp = new MatrixIndexes();
for( int i=0; i<rlen; i++ ) {
if( a.isEmpty(i) ) continue;
long ai = row_offset+i;
int apos = a.pos(i);
int alen = a.size(i);
int[] aix = a.indexes(i);
double[] avals = a.values(i);
for( int j=apos; j<apos+alen; j++ ) {
long aj = col_offset+aix[j];
ixtmp = computeResultBlockIndex(ixtmp, ai, aj, mcIn, mcOut, rowwise);
MatrixBlock out = getAllocatedBlock(rix, ixtmp);
ixtmp = computeInBlockIndex(ixtmp, ai, aj, mcIn, mcOut, rowwise);
out.appendValue((int)ixtmp.getRowIndex(),(int)ixtmp.getColumnIndex(), avals[j]);
}
}
//cleanup for sparse blocks
if( !rowwise && mcIn.getRows() > 1 ) {
rix.values().stream().filter(b -> b.sparse)
.forEach(b -> b.sortSparseRows());
}
}
private static MatrixBlock getAllocatedBlock(Map<MatrixIndexes,MatrixBlock> rix, MatrixIndexes ix) {
MatrixBlock out = rix.get(ix);
if( out == null )
throw new DMLRuntimeException("Missing result block: "+ix);
return out;
}
/**
* Assumes internal (0-begin) indices ai, aj as input; computes external block indexes (1-begin)
*
* @param ixout matrix indexes for reuse
* @param ai row index
* @param aj column index
* @param mcIn input matrix characteristics
* @param mcOut output matrix characteristics
* @param rowwise row-wise extraction
* @return matrix indexes
*/
private static MatrixIndexes computeResultBlockIndex(MatrixIndexes ixout, long ai, long aj,
DataCharacteristics mcIn, DataCharacteristics mcOut, boolean rowwise )
{
long tempc = computeGlobalCellIndex(mcIn, ai, aj, rowwise);
long ci = rowwise ? tempc/mcOut.getCols() : tempc%mcOut.getRows();
long cj = rowwise ? tempc%mcOut.getCols() : tempc/mcOut.getRows();
long bci = ci/mcOut.getBlocksize() + 1;
long bcj = cj/mcOut.getBlocksize() + 1;
return ixout.setIndexes(bci, bcj);
}
private static MatrixIndexes computeInBlockIndex(MatrixIndexes ixout, long ai, long aj,
DataCharacteristics mcIn, DataCharacteristics mcOut, boolean rowwise )
{
long tempc = computeGlobalCellIndex(mcIn, ai, aj, rowwise);
long ci = rowwise ? (tempc/mcOut.getCols())%mcOut.getBlocksize() :
(tempc%mcOut.getRows())%mcOut.getBlocksize();
long cj = rowwise ? (tempc%mcOut.getCols())%mcOut.getBlocksize() :
(tempc/mcOut.getRows())%mcOut.getBlocksize();
return ixout.setIndexes(ci, cj);
}
private static long computeGlobalCellIndex(DataCharacteristics mcIn, long ai, long aj, boolean rowwise) {
return rowwise ? ai*mcIn.getCols()+aj : ai+mcIn.getRows()*aj;
}
private static MatrixBlock removeEmptyRows(MatrixBlock in, MatrixBlock ret, MatrixBlock select, boolean emptyReturn) {
final int m = in.rlen;
final int n = in.clen;
boolean[] flags = null;
int rlen2 = 0;
//Step 0: special case handling
if( SHALLOW_COPY_REORG && SPARSE_OUTPUTS_IN_CSR
&& in.sparse && !in.isEmptyBlock(false)
&& select==null && in.sparseBlock instanceof SparseBlockCSR
&& in.nonZeros < Integer.MAX_VALUE )
{
//create the output in csr format with a shallow copy of arrays for column
//indexes and values (heuristic: shallow copy better than copy to dense)
SparseBlockCSR sblock = (SparseBlockCSR) in.sparseBlock;
int lrlen = 0;
for( int i=0; i<m; i++ )
lrlen += sblock.isEmpty(i) ? 0 : 1;
//check for sparse output representation, otherwise
//fall back to default pass to allocate in dense format
if( MatrixBlock.evalSparseFormatInMemory(lrlen, n, in.nonZeros) ) {
int[] rptr = new int[lrlen+1];
for( int i=0, j=0, pos=0; i<m; i++ )
if( !sblock.isEmpty(i) ) {
pos += sblock.size(i);
rptr[++j] = pos;
}
ret.reset(lrlen, in.clen, true);
ret.sparseBlock = new SparseBlockCSR(rptr,
sblock.indexes(), sblock.values(), (int)in.nonZeros);
ret.nonZeros = in.nonZeros;
return ret;
}
}
//Step 1: scan block and determine non-empty rows
if(select == null)
{
flags = new boolean[ m ]; //false
if( in.sparse ) { //SPARSE
SparseBlock a = in.sparseBlock;
for ( int i=0; i < m; i++ )
rlen2 += (flags[i] = !a.isEmpty(i)) ? 1 : 0;
}
else { //DENSE
DenseBlock a = in.getDenseBlock();
for( int i=0; i<m; i++ ) {
double[] avals = a.values(i);
int aix = a.pos(i);
for(int j=0; j<n; j++)
if( avals[aix+j] != 0 ) {
flags[i] = true;
rlen2++;
//early abort for current row
break;
}
}
}
}
else {
flags = DataConverter.convertToBooleanVector(select);
rlen2 = (int)select.getNonZeros();
}
//Step 2: reset result and copy rows
//dense stays dense if correct input representation (but robust for any input),
//sparse might be dense/sparse
rlen2 = Math.max(rlen2, emptyReturn ? 1 : 0); //ensure valid output
boolean sp = MatrixBlock.evalSparseFormatInMemory(rlen2, n, in.nonZeros);
ret.reset(rlen2, n, sp);
if( in.isEmptyBlock(false) )
return ret;
if( SHALLOW_COPY_REORG && m == rlen2 ) {
ret.sparse = in.sparse;
if( ret.sparse )
ret.sparseBlock = in.sparseBlock;
else
ret.denseBlock = in.denseBlock;
}
else if( in.sparse ) //* <- SPARSE
{
//note: output dense or sparse
for( int i=0, cix=0; i<m; i++ )
if( flags[i] ) {
ret.appendRow(cix++, in.sparseBlock.get(i),
!SHALLOW_COPY_REORG);
}
}
else if( !in.sparse && !ret.sparse ) //DENSE <- DENSE
{
ret.allocateDenseBlock();
DenseBlock a = in.getDenseBlock();
DenseBlock c = ret.getDenseBlock();
for( int i=0, ci=0; i<m; i++ )
if( flags[i] ) {
System.arraycopy(a.values(i),
a.pos(i), c.values(ci), c.pos(ci), n);
ci++; //target row index
}
}
else //SPARSE <- DENSE
{
ret.allocateSparseRowsBlock();
DenseBlock a = in.getDenseBlock();
for( int i=0, ci=0; i<m; i++ )
if( flags[i] ) {
double[] avals = a.values(i);
int aix = a.pos(i);
for( int j=0; j<n; j++ )
ret.appendValue(ci, j, avals[aix+j]);
ci++;
}
}
//check sparsity
ret.nonZeros = (select==null) ?
in.nonZeros : ret.recomputeNonZeros();
ret.examSparsity();
return ret;
}
private static MatrixBlock removeEmptyColumns(MatrixBlock in, MatrixBlock ret, MatrixBlock select, boolean emptyReturn) {
final int m = in.rlen;
final int n = in.clen;
//Step 1: scan block and determine non-empty columns
//(we optimized for cache-friendly behavior and hence don't do early abort)
boolean[] flags = null;
if (select == null)
{
flags = new boolean[ n ]; //false
if( in.sparse ) { //SPARSE
SparseBlock a = in.sparseBlock;
for( int i=0; i<m; i++ )
if ( !a.isEmpty(i) ) {
int apos = a.pos(i);
int alen = a.size(i);
int[] aix = a.indexes(i);
for( int j=apos; j<apos+alen; j++ )
flags[ aix[j] ] = true;
}
}
else { //DENSE
DenseBlock a = in.getDenseBlock();
for( int i=0; i<m; i++ ) {
double[] avals = a.values(i);
int aix = a.pos(i);
for( int j=0; j<n; j++ )
flags[j] |= (avals[aix+j] != 0);
}
}
}
else {
flags = DataConverter.convertToBooleanVector(select);
}
//Step 2: determine number of columns
int clen2 = 0;
for( int j=0; j<n; j++ ) {
clen2 += flags[j] ? 1 : 0;
}
//Step 3: reset result and copy columns
//dense stays dense if correct input representation (but robust for any input),
// sparse might be dense/sparse
clen2 = Math.max(clen2, emptyReturn ? 1 : 0); //ensure valid output
boolean sp = MatrixBlock.evalSparseFormatInMemory(m, clen2, in.nonZeros);
ret.reset(m, clen2, sp);
if( in.isEmptyBlock(false) )
return ret;
if( SHALLOW_COPY_REORG && n == clen2 ) {
//quick path: shallow copy if unmodified
ret.sparse = in.sparse;
if( ret.sparse )
ret.sparseBlock = in.sparseBlock;
else
ret.denseBlock = in.denseBlock;
}
else
{
//create mapping of flags to target indexes
int[] cix = new int[n];
for( int j=0, pos=0; j<n; j++ ) {
if( flags[j] )
cix[j] = pos++;
}
//deep copy of modified outputs
if( in.sparse ) //* <- SPARSE
{
//note: output dense or sparse
SparseBlock a = in.sparseBlock;
for( int i=0; i<m; i++ )
if ( !a.isEmpty(i) ) {
int apos = a.pos(i);
int alen = a.size(i);
int[] aix = a.indexes(i);
double[] avals = a.values(i);
for( int j=apos; j<apos+alen; j++ )
if( flags[aix[j]] )
ret.appendValue(i, cix[aix[j]], avals[j]);
}
}
else if( !in.sparse && !ret.sparse ) { //DENSE <- DENSE
ret.allocateDenseBlock();
DenseBlock a = in.getDenseBlock();
DenseBlock c = ret.getDenseBlock();
for( int i=0; i<m; i++ ) {
double[] avals = a.values(i);
double[] cvals = c.values(i);
int aix = a.pos(i);
int lcix = c.pos(i);
for( int j=0; j<n; j++ )
if( flags[j] )
cvals[ lcix+cix[j] ] = avals[aix+j];
}
}
else { //SPARSE <- DENSE
ret.allocateSparseRowsBlock();
DenseBlock a = in.getDenseBlock();
for( int i=0; i<m; i++ ) {
double[] avals = a.values(i);
int aix = a.pos(i);
for( int j=0; j<n; j++ ) {
double aval = avals[aix+j];
if( flags[j] && aval!=0 )
ret.appendValue(i, cix[j], aval);
}
}
}
}
//check sparsity
ret.nonZeros = (select==null) ?
in.nonZeros : ret.recomputeNonZeros();
ret.examSparsity();
return ret;
}
private static MatrixBlock rexpandRows(MatrixBlock in, MatrixBlock ret, int max, boolean cast, boolean ignore) {
//set meta data
final int rlen = max;
final int clen = in.rlen;
final long nnz = in.nonZeros;
boolean sp = MatrixBlock.evalSparseFormatInMemory(rlen, clen, nnz);
ret.reset(rlen, clen, sp);
//setup temporary array for 'buffered append w/ sorting' in order
//to mitigate performance issues due to random row access for large m
final int blksize = 1024*1024; //max 12MB
int[] tmpi = new int[Math.min(blksize,clen)];
double[] tmp = new double[Math.min(blksize,clen)];
//expand input vertically (input vector likely dense
//but generic implementation for general case)
for( int i=0; i<clen; i+=blksize )
{
//create sorted block indexes (append buffer)
int len = Math.min(blksize, clen-i);
copyColVector(in, i, tmp, tmpi, len);
SortUtils.sortByValue(0, len, tmp, tmpi);
//process current append buffer
for( int j=0; j<len; j++ )
{
//get value and cast if necessary (table)
double val = tmp[j];
if( cast )
val = UtilFunctions.toLong(val);
//handle invalid values if not to be ignored
if( !ignore && val<=0 )
throw new DMLRuntimeException("Invalid input value <= 0 for ignore=false: "+val);
//set expanded value if matching
//note: tmpi populated with i+j indexes, then sorted
if( val == Math.floor(val) && val >= 1 && val <= max )
ret.appendValue((int)(val-1), tmpi[j], 1);
}
}
//ensure valid output sparse representation
//(necessary due to cache-conscious processing w/ unstable sort)
if( ret.isInSparseFormat() )
ret.sortSparseRows();
return ret;
}
private static MatrixBlock rexpandColumns(MatrixBlock in, MatrixBlock ret, int max, boolean cast, boolean ignore, int k) {
//set meta data
final int rlen = in.rlen;
final int clen = max;
final long nnz = in.nonZeros;
boolean sp = MatrixBlock.evalSparseFormatInMemory(rlen, clen, nnz);
ret.reset(rlen, clen, sp);
ret.allocateBlock();
//execute rexpand columns
long rnnz = 0; //real nnz (due to cutoff max)
if( k <= 1 || in.getNumRows() <= PAR_NUMCELL_THRESHOLD
|| (sp && SPARSE_OUTPUTS_IN_CSR) ) {
rnnz = rexpandColumns(in, ret, max, cast, ignore, 0, rlen);
}
else {
try {
ExecutorService pool = CommonThreadPool.get(k);
ArrayList<RExpandColsTask> tasks = new ArrayList<>();
int blklen = (int)(Math.ceil((double)rlen/k/8));
for( int i=0; i<8*k & i*blklen<rlen; i++ )
tasks.add(new RExpandColsTask(in, ret,
max, cast, ignore, i*blklen, Math.min((i+1)*blklen, rlen)));
List<Future<Long>> taskret = pool.invokeAll(tasks);
pool.shutdown();
for( Future<Long> task : taskret )
rnnz += task.get();
}
catch(Exception ex) {
throw new DMLRuntimeException(ex);
}
}
//post-processing
ret.setNonZeros(rnnz);
return ret;
}
private static long rexpandColumns(MatrixBlock in, MatrixBlock ret, int max, boolean cast, boolean ignore, int rl, int ru) {
//initialize auxiliary data structures
int lnnz = 0;
int[] cix = null;
if( ret.sparse && SPARSE_OUTPUTS_IN_CSR ) {
cix = new int[in.rlen];
Arrays.fill(cix, -1);
}
//expand input horizontally (input vector likely dense
//but generic implementation for general case)
DenseBlock cd = ret.getDenseBlock();
SparseBlock cs = ret.getSparseBlock();
for( int i=rl; i<ru; i++ )
{
//get value and cast if necessary (table)
double val = in.quickGetValue(i, 0);
if( cast )
val = UtilFunctions.toLong(val);
//handle invalid values if not to be ignored
if( !ignore && val<=0 )
throw new DMLRuntimeException("Invalid input value <= 0 for ignore=false: "+val);
//set expanded value if matching
if( val == Math.floor(val) && val >= 1 && val <= max ) {
//update target without global nnz maintenance
if( cix != null ) {
cix[i] = (int)(val-1);
}
else if( ret.sparse ) {
cs.allocate(i, 1);
cs.append(i, (int)(val-1), 1);
}
else
cd.set(i, (int)(val-1), 1);
lnnz ++;
}
}
//init CSR block once to avoid repeated updates of row pointers on append
if( cix != null )
ret.sparseBlock = new SparseBlockCSR(in.rlen, lnnz, cix);
//recompute nnz of partition
return ret.setNonZeros(lnnz);
}
private static void copyColVector( MatrixBlock in, int ixin, double[] tmp, int[] tmpi, int len)
{
//copy value array from input matrix
if( in.isEmptyBlock(false) ) {
Arrays.fill(tmp, 0, len, 0);
}
else if( in.sparse ){ //SPARSE
for( int i=0; i<len; i++ )
tmp[i] = in.quickGetValue(ixin+i, 0);
}
else { //DENSE
System.arraycopy(in.getDenseBlockValues(), ixin, tmp, 0, len);
}
//init index array
for( int i=0; i<len; i++ )
tmpi[i] = ixin + i;
}
/**
* Utility method for in-place transformation of an ascending sorted
* order into a descending sorted order. This method assumes dense
* column vectors as input.
*
* @param m1 matrix
*/
private static void sortReverseDense( MatrixBlock m1 ) {
int rlen = m1.rlen;
double[] a = m1.getDenseBlockValues();
for( int i=0; i<rlen/2; i++ ) {
double tmp = a[i];
a[i] = a[rlen - i -1];
a[rlen - i - 1] = tmp;
}
}
private static void sortReverseDense( int[] a ) {
int rlen = a.length;
for( int i=0; i<rlen/2; i++ ) {
int tmp = a[i];
a[i] = a[rlen - i -1];
a[rlen - i - 1] = tmp;
}
}
private static void sortReverseDense( double[] a ) {
int rlen = a.length;
for( int i=0; i<rlen/2; i++ ) {
double tmp = a[i];
a[i] = a[rlen - i -1];
a[rlen - i - 1] = tmp;
}
}
// Method to merge all blocks of a specified length.
private static void mergeSortedBlocks(int blockLength, int[] valueIndexes, double[] values, int k){
// Check if the blocklength is bigger than the size of the values
// if it is smaller then merge the blocks, if not merging is done
int vlen = values.length;
int mergeBlockSize = blockLength * 2;
if (mergeBlockSize <= vlen + blockLength){
try {
ExecutorService pool = CommonThreadPool.get(k);
ArrayList<MergeTask> tasks = new ArrayList<>();
for( int i=0; i*mergeBlockSize<vlen; i++ ){
int start = i*mergeBlockSize;
if (start + blockLength < vlen){
int stop = Math.min(vlen , (i+1)*mergeBlockSize);
tasks.add(new MergeTask(start, stop, blockLength, valueIndexes, values));
}
}
CommonThreadPool.invokeAndShutdown(pool, tasks);
// recursive merge larger blocks.
mergeSortedBlocks(mergeBlockSize, valueIndexes, values, k);
}
catch(Exception ex) {
throw new DMLRuntimeException(ex);
}
}
}
private static void sortBySecondary(int rl, int ru, double[] values, int[] vix, MatrixBlock in, int[] by, int off) {
//find runs of equal values in current offset and index range
//replace value by next column, sort, and recurse until single value
for( int i=rl; i<ru-1; i++ ) {
double tmp = values[i];
//determine run of equal values
int len = 0;
while( i+len+1<ru && tmp==values[i+len+1] )
len++;
//temp value replacement and recursive sort
if( len > 0 ) {
double old = values[i];
//extract values of next column
for(int j=i; j<i+len+1; j++)
values[j] = in.quickGetValue(vix[j], by[off]-1);
//sort values, incl recursive decent
SortUtils.sortByValue(i, i+len+1, values, vix);
if( off+1 < by.length )
sortBySecondary(i, i+len+1, values, vix, in, by, off+1);
//reset values of previous level
Arrays.fill(values, i, i+len+1, old);
i += len; //skip processed run
}
}
}
private static void sortIndexesStable(int rl, int ru, double[] values, int[] vix, MatrixBlock in, int[] by, int off) {
for( int i=rl; i<ru-1; i++ ) {
double tmp = values[i];
//determine run of equal values
int len = 0;
while( i+len+1<ru && tmp==values[i+len+1] )
len++;
//temp value replacement and recursive decent
if( len > 0 ) {
if( off < by.length ) {
//extract values of next column
for(int j=i; j<i+len+1; j++)
values[j] = in.quickGetValue(vix[j], by[off]-1);
sortIndexesStable(i, i+len+1, values, vix, in, by, off+1);
}
else //unstable sort of run indexes (equal value guaranteed)
Arrays.sort(vix, i, i+len+1);
i += len; //skip processed run
}
}
}
private static boolean isValidSortByList(int[] by, int clen) {
if( by == null || by.length==0 || by.length>clen )
return false;
for(int i=0; i<by.length; i++)
if( by[i] <= 0 || clen < by[i])
return false;
return true;
}
@SuppressWarnings("unused")
private static void countAgg( int[] c, int[] ai, final int len )
{
final int bn = len%8;
//compute rest, not aligned to 8-block
for( int i=0; i<bn; i++ )
c[ ai[i] ]++;
//unrolled 8-block (for better instruction level parallelism)
for( int i=bn; i<len; i+=8 )
{
c[ ai[ i+0 ] ] ++;
c[ ai[ i+1 ] ] ++;
c[ ai[ i+2 ] ] ++;
c[ ai[ i+3 ] ] ++;
c[ ai[ i+4 ] ] ++;
c[ ai[ i+5 ] ] ++;
c[ ai[ i+6 ] ] ++;
c[ ai[ i+7 ] ] ++;
}
}
private static void countAgg( int[] c, int[] aix, int ai, final int len )
{
final int bn = len%8;
//compute rest, not aligned to 8-block
for( int i=ai; i<ai+bn; i++ )
c[ aix[i] ]++;
//unrolled 8-block (for better instruction level parallelism)
for( int i=ai+bn; i<ai+len; i+=8 )
{
c[ aix[ i+0 ] ] ++;
c[ aix[ i+1 ] ] ++;
c[ aix[ i+2 ] ] ++;
c[ aix[ i+3 ] ] ++;
c[ aix[ i+4 ] ] ++;
c[ aix[ i+5 ] ] ++;
c[ aix[ i+6 ] ] ++;
c[ aix[ i+7 ] ] ++;
}
}
@SuppressWarnings("unused")
private static class AscRowComparator implements Comparator<Integer>
{
private MatrixBlock _mb = null;
private int _col = -1;
public AscRowComparator( MatrixBlock mb, int col )
{
_mb = mb;
_col = col;
}
@Override
public int compare(Integer arg0, Integer arg1)
{
double val0 = _mb.quickGetValue(arg0, _col);
double val1 = _mb.quickGetValue(arg1, _col);
return (val0 < val1 ? -1 : (val0 == val1 ? 0 : 1));
}
}
@SuppressWarnings("unused")
private static class DescRowComparator implements Comparator<Integer>
{
private MatrixBlock _mb = null;
private int _col = -1;
public DescRowComparator( MatrixBlock mb, int col )
{
_mb = mb;
_col = col;
}
@Override
public int compare(Integer arg0, Integer arg1)
{
double val0 = _mb.quickGetValue(arg0, _col);
double val1 = _mb.quickGetValue(arg1, _col);
return (val0 > val1 ? -1 : (val0 == val1 ? 0 : 1));
}
}
private static class TransposeTask implements Callable<Object>
{
private MatrixBlock _in = null;
private MatrixBlock _out = null;
private boolean _row = false;
private int _rl = -1;
private int _ru = -1;
private int[] _cnt = null;
protected TransposeTask(MatrixBlock in, MatrixBlock out, boolean row, int rl, int ru, int[] cnt) {
_in = in;
_out = out;
_row = row;
_rl = rl;
_ru = ru;
_cnt = cnt;
}
@Override
public Object call() {
int rl = _row ? _rl : 0;
int ru = _row ? _ru : _in.rlen;
int cl = _row ? 0 : _rl;
int cu = _row ? _in.clen : _ru;
//execute transpose operation
if( !_in.sparse && !_out.sparse )
transposeDenseToDense( _in, _out, rl, ru, cl, cu );
else if( _in.sparse && _out.sparse )
transposeSparseToSparse( _in, _out, rl, ru, cl, cu, _cnt );
else if( _in.sparse )
transposeSparseToDense( _in, _out, rl, ru, cl, cu );
else
throw new DMLRuntimeException("Unsupported multi-threaded dense-sparse transpose.");
return null;
}
}
private static class CountNnzTask implements Callable<int[]>
{
private MatrixBlock _in = null;
private int _rl = -1;
private int _ru = -1;
protected CountNnzTask(MatrixBlock in, int rl, int ru) {
_in = in;
_rl = rl;
_ru = ru;
}
@Override
public int[] call() {
return countNnzPerColumn(_in, _rl, _ru);
}
}
private static class RExpandColsTask implements Callable<Long>
{
private final MatrixBlock _in;
private final MatrixBlock _out;
private final int _max;
private final boolean _cast;
private final boolean _ignore;
private final int _rl;
private final int _ru;
protected RExpandColsTask(MatrixBlock in, MatrixBlock out, int max, boolean cast, boolean ignore, int rl, int ru) {
_in = in;
_out = out;
_max = max;
_cast = cast;
_ignore = ignore;
_rl = rl;
_ru = ru;
}
@Override
public Long call() {
return rexpandColumns(_in, _out, _max, _cast, _ignore, _rl, _ru);
}
}
private static class SortTask implements Callable<Object>
{
private final int _start;
private final int _end;
private final int[] _indexes;
private final double[] _values;
protected SortTask(int start, int end, int[] indexes, double[] values){
_start = start;
_end = end;
_indexes = indexes;
_values = values;
}
@Override
public Long call(){
SortUtils.sortByValue(_start, _end, _values, _indexes);
return 1l;
}
}
private static class MergeTask implements Callable<Object>
{
private final int _start;
private final int _end;
private final int _blockSize;
private final int[] _indexes;
private final double[] _values;
protected MergeTask(int start, int end, int blockSize, int[] indexes, double[] values){
_start = start;
_end = end;
_blockSize = blockSize;
_indexes = indexes;
_values = values;
}
@Override
public Long call(){
// Find the middle index of the two merge blocks
int middle = _start + _blockSize;
// Early return if edge case
if (middle == _end) return 1l;
// Pointer left side merge.
int pointlIndex = middle -1;
// Pointer to merge index.
int positionToAssign = _end - 1;
// Make copy of entire right side so that we use left side directly as output.
// Results in worst case (and most cases) allocation of extra array of half input size.
int[] rhsCopy = Arrays.copyOfRange(_indexes, middle, _end);
double[] rhsCopyV = Arrays.copyOfRange(_values, middle, _end);
int pointrIndex = _end - middle - 1;
while (positionToAssign >= _start && pointrIndex >= 0) {
if( pointrIndex < 0
|| ( pointlIndex >= _start && _values[pointlIndex] > rhsCopyV[pointrIndex]) )
{
_values[positionToAssign] = _values[pointlIndex];
_indexes[positionToAssign] = _indexes[pointlIndex];
pointlIndex--;
positionToAssign--;
}
else {
_values[positionToAssign] = rhsCopyV[pointrIndex];
_indexes[positionToAssign] = rhsCopy[pointrIndex];
positionToAssign--;
pointrIndex--;
}
}
return 1l;
}
}
private static class CopyTask implements Callable<Object>
{
private final MatrixBlock _in;
private final MatrixBlock _out;
private final int[] _vix;
private final int _rl;
private final int _ru;
protected CopyTask(MatrixBlock in, MatrixBlock out, int[] vix, int rl, int ru) {
_in = in;
_out = out;
_vix = vix;
_rl = rl;
_ru = ru;
}
@Override
public Object call() {
int clen = _in.clen;
if( !_out.sparse ) { //DENSE
DenseBlock a = _in.getDenseBlock();
DenseBlock c = _out.getDenseBlock();
for( int i=_rl; i<_ru; i++ )
System.arraycopy(a.values(_vix[i]), a.pos(_vix[i]), c.values(i), c.pos(i), clen);
}
else { //SPARSE
for( int i=_rl; i<_ru; i++ )
if( !_in.sparseBlock.isEmpty(_vix[i]) )
_out.sparseBlock.set(i, _in.sparseBlock.get(_vix[i]),
!SHALLOW_COPY_REORG); //row remains unchanged
}
return null;
}
}
}