| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.sysml.runtime.compress; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.Externalizable; |
| import java.io.IOException; |
| import java.io.ObjectInput; |
| import java.io.ObjectOutput; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.PriorityQueue; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.commons.math3.random.Well1024a; |
| import org.apache.log4j.Level; |
| import org.apache.log4j.Logger; |
| import org.apache.sysml.hops.OptimizerUtils; |
| import org.apache.sysml.lops.MMTSJ.MMTSJType; |
| import org.apache.sysml.lops.MapMultChain.ChainType; |
| import org.apache.sysml.runtime.DMLRuntimeException; |
| import org.apache.sysml.runtime.compress.ColGroup.ColGroupRowIterator; |
| import org.apache.sysml.runtime.compress.ColGroup.CompressionType; |
| import org.apache.sysml.runtime.compress.cocode.PlanningCoCoder; |
| import org.apache.sysml.runtime.compress.estim.CompressedSizeEstimator; |
| import org.apache.sysml.runtime.compress.estim.CompressedSizeInfo; |
| import org.apache.sysml.runtime.compress.estim.SizeEstimatorFactory; |
| import org.apache.sysml.runtime.compress.utils.ConverterUtils; |
| import org.apache.sysml.runtime.compress.utils.LinearAlgebraUtils; |
| import org.apache.sysml.runtime.controlprogram.caching.CacheBlock; |
| import org.apache.sysml.runtime.controlprogram.caching.MatrixObject.UpdateType; |
| import org.apache.sysml.runtime.controlprogram.parfor.stat.Timing; |
| import org.apache.sysml.runtime.functionobjects.Builtin; |
| import org.apache.sysml.runtime.functionobjects.Builtin.BuiltinCode; |
| import org.apache.sysml.runtime.functionobjects.KahanFunction; |
| import org.apache.sysml.runtime.functionobjects.KahanPlus; |
| import org.apache.sysml.runtime.functionobjects.KahanPlusSq; |
| import org.apache.sysml.runtime.functionobjects.Multiply; |
| import org.apache.sysml.runtime.functionobjects.ReduceAll; |
| import org.apache.sysml.runtime.functionobjects.ReduceCol; |
| import org.apache.sysml.runtime.instructions.cp.CM_COV_Object; |
| import org.apache.sysml.runtime.instructions.cp.KahanObject; |
| import org.apache.sysml.runtime.instructions.cp.ScalarObject; |
| import org.apache.sysml.runtime.matrix.data.CTableMap; |
| import org.apache.sysml.runtime.matrix.data.IJV; |
| import org.apache.sysml.runtime.matrix.data.LibMatrixBincell; |
| import org.apache.sysml.runtime.matrix.data.LibMatrixReorg; |
| import org.apache.sysml.runtime.matrix.data.MatrixBlock; |
| import org.apache.sysml.runtime.matrix.data.MatrixIndexes; |
| import org.apache.sysml.runtime.matrix.data.MatrixValue; |
| import org.apache.sysml.runtime.matrix.data.RandomMatrixGenerator; |
| import org.apache.sysml.runtime.matrix.data.SparseBlock; |
| import org.apache.sysml.runtime.matrix.data.SparseRow; |
| import org.apache.sysml.runtime.matrix.data.SparseRowVector; |
| import org.apache.sysml.runtime.matrix.mapred.IndexedMatrixValue; |
| import org.apache.sysml.runtime.matrix.operators.AggregateBinaryOperator; |
| import org.apache.sysml.runtime.matrix.operators.AggregateOperator; |
| import org.apache.sysml.runtime.matrix.operators.AggregateTernaryOperator; |
| import org.apache.sysml.runtime.matrix.operators.AggregateUnaryOperator; |
| import org.apache.sysml.runtime.matrix.operators.BinaryOperator; |
| import org.apache.sysml.runtime.matrix.operators.CMOperator; |
| import org.apache.sysml.runtime.matrix.operators.COVOperator; |
| import org.apache.sysml.runtime.matrix.operators.Operator; |
| import org.apache.sysml.runtime.matrix.operators.QuaternaryOperator; |
| import org.apache.sysml.runtime.matrix.operators.ReorgOperator; |
| import org.apache.sysml.runtime.matrix.operators.ScalarOperator; |
| import org.apache.sysml.runtime.matrix.operators.TernaryOperator; |
| import org.apache.sysml.runtime.matrix.operators.UnaryOperator; |
| import org.apache.sysml.runtime.util.CommonThreadPool; |
| import org.apache.sysml.runtime.util.IndexRange; |
| import org.apache.sysml.runtime.util.SortUtils; |
| |
| /** |
| * Experimental version of MatrixBlock that allows a compressed internal |
| * representation. |
| */ |
| public class CompressedMatrixBlock extends MatrixBlock implements Externalizable |
| { |
| private static final long serialVersionUID = 7319972089143154057L; |
| |
| //internal configuration |
| public static final boolean TRANSPOSE_INPUT = true; |
| public static final boolean MATERIALIZE_ZEROS = false; |
| public static final long MIN_PAR_AGG_THRESHOLD = 16*1024*1024; //16MB |
| public static final boolean INVESTIGATE_ESTIMATES = false; |
| public static boolean ALLOW_DDC_ENCODING = true; |
| public static final boolean ALLOW_SHARED_DDC1_DICTIONARY = true; |
| private static final boolean LDEBUG = true; //local debug flag |
| private static final Level LDEBUG_LEVEL = Level.INFO; //DEBUG/TRACE for details |
| |
| private static final Log LOG = LogFactory.getLog(CompressedMatrixBlock.class.getName()); |
| |
| static{ |
| // for internal debugging only |
| if( LDEBUG ) { |
| Logger.getLogger("org.apache.sysml.runtime.compress") |
| .setLevel((Level) LDEBUG_LEVEL); |
| } |
| } |
| |
| protected ArrayList<ColGroup> _colGroups = null; |
| protected CompressionStatistics _stats = null; |
| protected boolean _sharedDDC1Dict = false; |
| |
| public CompressedMatrixBlock() { |
| super(0, 0, true); |
| } |
| |
| /** |
| * Main constructor for building a block from scratch. |
| * |
| * @param rl |
| * number of rows in the block |
| * @param cl |
| * number of columns |
| * @param sparse |
| * true if the UNCOMPRESSED representation of the block should be |
| * sparse |
| */ |
| public CompressedMatrixBlock(int rl, int cl, boolean sparse) { |
| super(rl, cl, sparse); |
| } |
| |
| /** |
| * "Copy" constructor to populate this compressed block with the |
| * uncompressed contents of a conventional block. Does <b>not</b> compress |
| * the block. |
| * |
| * @param mb matrix block |
| */ |
| public CompressedMatrixBlock(MatrixBlock mb) { |
| super(mb.getNumRows(), mb.getNumColumns(), mb.isInSparseFormat()); |
| |
| //shallow copy (deep copy on compression, prevents unnecessary copy) |
| if( isInSparseFormat() ) |
| sparseBlock = mb.getSparseBlock(); |
| else |
| denseBlock = mb.getDenseBlock(); |
| nonZeros = mb.getNonZeros(); |
| } |
| |
| /** |
| * Obtain the column groups. |
| * |
| * @return the column groups constructed by the compression process. |
| * |
| */ |
| public ArrayList<ColGroup> getColGroups() { |
| return _colGroups; |
| } |
| |
| public int getNumColGroups() { |
| return _colGroups.size(); |
| } |
| |
| /** |
| * Obtain whether this block is in compressed form or not. |
| * |
| * @return true if this block is in compressed form; false if the block has |
| * not yet been compressed |
| */ |
| public boolean isCompressed() { |
| return (_colGroups != null); |
| } |
| |
| public boolean isSingleUncompressedGroup(){ |
| return (_colGroups!=null && _colGroups.size()==1 |
| && _colGroups.get(0) instanceof ColGroupUncompressed); |
| } |
| |
| private void allocateColGroupList() { |
| _colGroups = new ArrayList<>(); |
| } |
| |
| @Override |
| public boolean isEmptyBlock(boolean safe) { |
| if( !isCompressed() ) |
| return super.isEmptyBlock(safe); |
| return (_colGroups == null || getNonZeros()==0); |
| } |
| |
| /** |
| * Compress the contents of this matrix block. After compression, the |
| * uncompressed data is discarded. Attempts to update this block after |
| * calling this method currently result in INCORRECT BEHAVIOR, something |
| * which should be fixed if we move ahead with this compression strategy. |
| * |
| * +per column sparsity |
| * |
| * @return compressed matrix block or original block if incompressible |
| */ |
| public MatrixBlock compress() { |
| //default sequential execution |
| return compress(1); |
| } |
| |
| /** |
| * Compress block. |
| * |
| * @param k number of threads |
| * @return compressed matrix block or original block if incompressible |
| */ |
| public MatrixBlock compress(int k) { |
| //check for redundant compression |
| if( isCompressed() ){ |
| throw new DMLRuntimeException("Redundant compression, block already compressed."); |
| } |
| |
| Timing time = new Timing(true); |
| _stats = new CompressionStatistics(); |
| |
| // SAMPLE-BASED DECISIONS: |
| // Decisions such as testing if a column is amenable to bitmap |
| // compression or evaluating co-coding potentionls are made based on a |
| // subset of the rows. For large datasets, sampling might take a |
| // significant amount of time. So, we generate only one sample and use |
| // it for the entire compression process. |
| |
| //prepare basic meta data and deep copy / transpose input |
| final int numRows = getNumRows(); |
| final int numCols = getNumColumns(); |
| final boolean sparse = isInSparseFormat(); |
| MatrixBlock rawblock = !TRANSPOSE_INPUT ? new MatrixBlock(this) : |
| LibMatrixReorg.transpose(this, new MatrixBlock(numCols, numRows, sparse), k); |
| |
| //construct sample-based size estimator |
| CompressedSizeEstimator bitmapSizeEstimator = |
| SizeEstimatorFactory.getSizeEstimator(rawblock, numRows); |
| |
| // PHASE 1: Classify columns by compression type |
| // We start by determining which columns are amenable to compression |
| List<Integer> colsC = new ArrayList<>(); |
| List<Integer> colsUC = new ArrayList<>(); |
| HashMap<Integer, Double> compRatios = new HashMap<>(); |
| |
| // Classify columns according to ratio (size uncompressed / size compressed), |
| // where a column is compressible if ratio > 1. |
| CompressedSizeInfo[] sizeInfos = (k > 1) ? |
| computeCompressedSizeInfos(bitmapSizeEstimator, numCols, k) : |
| computeCompressedSizeInfos(bitmapSizeEstimator, numCols); |
| long nnzUC = 0; |
| for (int col = 0; col < numCols; col++) { |
| double uncompSize = getUncompressedSize(numRows, 1, |
| OptimizerUtils.getSparsity(numRows, 1, sizeInfos[col].getEstNnz())); |
| double compRatio = uncompSize / sizeInfos[col].getMinSize(); |
| if( compRatio > 1 ) { |
| colsC.add(col); |
| compRatios.put(col, compRatio); |
| } |
| else { |
| colsUC.add(col); |
| nnzUC += sizeInfos[col].getEstNnz(); |
| } |
| } |
| |
| // correction of column classification (reevaluate dense estimates if necessary) |
| boolean sparseUC = MatrixBlock.evalSparseFormatInMemory(numRows, colsUC.size(), nnzUC); |
| if( !sparseUC && !colsUC.isEmpty() ) { |
| for( int i=0; i<colsUC.size(); i++ ) { |
| int col = colsUC.get(i); |
| double uncompSize = getUncompressedSize(numRows, 1, 1.0); |
| double compRatio = uncompSize / sizeInfos[col].getMinSize(); |
| if( compRatio > 1 ) { |
| colsC.add(col); |
| colsUC.remove(i); i--; |
| compRatios.put(col, compRatio); |
| nnzUC -= sizeInfos[col].getEstNnz(); |
| } |
| } |
| } |
| |
| if( LOG.isTraceEnabled() ) { |
| LOG.trace("C: "+Arrays.toString(colsC.toArray(new Integer[0]))); |
| LOG.trace("-- compression ratios: "+Arrays.toString( |
| colsC.stream().map(c -> compRatios.get(c)).toArray())); |
| LOG.trace("UC: "+Arrays.toString(colsUC.toArray(new Integer[0]))); |
| LOG.trace("-- compression ratios: "+Arrays.toString( |
| colsUC.stream().map(c -> compRatios.get(c)).toArray())); |
| } |
| |
| if( LOG.isDebugEnabled() ) { |
| _stats.timePhase1 = time.stop(); |
| LOG.debug("Compression statistics:"); |
| LOG.debug("--compression phase 1: "+_stats.timePhase1); |
| } |
| |
| if( colsC.isEmpty() ) { |
| if( LOG.isDebugEnabled() ) |
| LOG.debug("Abort block compression because all columns are incompressible."); |
| return new MatrixBlock().copyShallow(this); |
| } |
| |
| // PHASE 2: Grouping columns |
| // Divide the bitmap columns into column groups. |
| List<int[]> bitmapColGrps = PlanningCoCoder.findCocodesByPartitioning( |
| bitmapSizeEstimator, colsC, sizeInfos, numRows, k); |
| |
| if( LOG.isDebugEnabled() ) { |
| _stats.timePhase2 = time.stop(); |
| LOG.debug("--compression phase 2: "+_stats.timePhase2); |
| } |
| |
| if( INVESTIGATE_ESTIMATES ) { |
| double est = 0; |
| for( int[] groupIndices : bitmapColGrps ) |
| est += bitmapSizeEstimator.estimateCompressedColGroupSize(groupIndices).getMinSize(); |
| est += MatrixBlock.estimateSizeInMemory(numRows, colsUC.size(), |
| OptimizerUtils.getSparsity(numRows, colsUC.size(), nnzUC)); |
| _stats.estSize = est; |
| } |
| |
| // PHASE 3: Compress and correct sample-based decisions |
| ColGroup[] colGroups = (k > 1) ? |
| compressColGroups(rawblock, bitmapSizeEstimator, compRatios, numRows, bitmapColGrps, colsUC.isEmpty(), k) : |
| compressColGroups(rawblock, bitmapSizeEstimator, compRatios, numRows, bitmapColGrps, colsUC.isEmpty()); |
| allocateColGroupList(); |
| HashSet<Integer> remainingCols = seq(0, numCols-1, 1); |
| for( int j=0; j<colGroups.length; j++ ) { |
| if( colGroups[j] != null ) { |
| for( int col : colGroups[j].getColIndices() ) |
| remainingCols.remove(col); |
| _colGroups.add(colGroups[j]); |
| } |
| } |
| |
| if( LOG.isDebugEnabled() ) { |
| _stats.timePhase3 = time.stop(); |
| LOG.debug("--compression phase 3: "+_stats.timePhase3); |
| } |
| |
| // PHASE 4: Best-effort dictionary sharing for DDC1 single-col groups |
| double[] dict = createSharedDDC1Dictionary(_colGroups); |
| if( dict != null ) { |
| applySharedDDC1Dictionary(_colGroups, dict); |
| _sharedDDC1Dict = true; |
| } |
| |
| if( LOG.isDebugEnabled() ) { |
| _stats.timePhase4 = time.stop(); |
| LOG.debug("--compression phase 4: "+_stats.timePhase4); |
| } |
| |
| // Phase 5: Cleanup |
| // The remaining columns are stored uncompressed as one big column group |
| if( !remainingCols.isEmpty() ) { |
| ArrayList<Integer> list = new ArrayList<>(remainingCols); |
| ColGroupUncompressed ucgroup = new ColGroupUncompressed(list, rawblock); |
| _colGroups.add(ucgroup); |
| } |
| |
| _stats.size = estimateCompressedSizeInMemory(); |
| _stats.ratio= estimateSizeInMemory() / _stats.size; |
| |
| if( _stats.ratio < 1 ) { |
| if( LOG.isDebugEnabled() ) |
| LOG.debug("Abort block compression because compression ratio is less than 1."); |
| return new MatrixBlock().copyShallow(this); |
| } |
| |
| //final cleanup (discard uncompressed block) |
| rawblock.cleanupBlock(true, true); |
| this.cleanupBlock(true, true); |
| |
| if( LOG.isDebugEnabled() ) { |
| _stats.timePhase5 = time.stop(); |
| int[] counts = getColGroupCounts(_colGroups); |
| LOG.debug("--compression phase 5: "+_stats.timePhase5); |
| LOG.debug("--num col groups: "+_colGroups.size()); |
| LOG.debug("--col groups types (OLE,RLE,DDC1,DDC2,UC): " |
| +counts[2]+","+counts[1]+","+counts[3]+","+counts[4]+","+counts[0]); |
| LOG.debug("--col groups sizes (OLE,RLE,DDC1,DDC2,UC): " |
| +counts[7]+","+counts[6]+","+counts[8]+","+counts[9]+","+counts[5]); |
| LOG.debug("--compressed size: "+_stats.size); |
| LOG.debug("--compression ratio: "+_stats.ratio); |
| } |
| |
| return this; |
| } |
| |
| public CompressionStatistics getCompressionStatistics() { |
| return _stats; |
| } |
| |
| /** |
| * Get array of counts regarding col group types. The position |
| * corresponds with the enum ordinal. |
| * |
| * @param colgroups list of column groups |
| * @return counts |
| */ |
| private static int[] getColGroupCounts(ArrayList<ColGroup> colgroups) { |
| int[] ret = new int[10]; //5 x count, 5 x num_columns |
| for( ColGroup c : colgroups ) { |
| ret[c.getCompType().ordinal()] ++; |
| ret[5+c.getCompType().ordinal()] += c.getNumCols(); |
| } |
| return ret; |
| } |
| |
| private static CompressedSizeInfo[] computeCompressedSizeInfos(CompressedSizeEstimator estim, int clen) { |
| CompressedSizeInfo[] ret = new CompressedSizeInfo[clen]; |
| for( int col=0; col<clen; col++ ) |
| ret[col] = estim.estimateCompressedColGroupSize(new int[] { col }); |
| return ret; |
| } |
| |
| private static CompressedSizeInfo[] computeCompressedSizeInfos(CompressedSizeEstimator estim, int clen, int k) |
| { |
| try { |
| ExecutorService pool = CommonThreadPool.get(k); |
| ArrayList<SizeEstimTask> tasks = new ArrayList<>(); |
| for( int col=0; col<clen; col++ ) |
| tasks.add(new SizeEstimTask(estim, col)); |
| List<Future<CompressedSizeInfo>> rtask = pool.invokeAll(tasks); |
| ArrayList<CompressedSizeInfo> ret = new ArrayList<>(); |
| for( Future<CompressedSizeInfo> lrtask : rtask ) |
| ret.add(lrtask.get()); |
| pool.shutdown(); |
| return ret.toArray(new CompressedSizeInfo[0]); |
| } |
| catch(Exception ex) { |
| throw new DMLRuntimeException(ex); |
| } |
| } |
| |
| private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, List<int[]> groups, boolean denseEst) { |
| ColGroup[] ret = new ColGroup[groups.size()]; |
| for( int i=0; i<groups.size(); i++ ) |
| ret[i] = compressColGroup(in, estim, compRatios, rlen, groups.get(i), denseEst); |
| |
| return ret; |
| } |
| |
| private static ColGroup[] compressColGroups(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, List<int[]> groups, boolean denseEst, int k) |
| { |
| try { |
| ExecutorService pool = CommonThreadPool.get(k); |
| ArrayList<CompressTask> tasks = new ArrayList<>(); |
| for( int[] colIndexes : groups ) |
| tasks.add(new CompressTask(in, estim, compRatios, rlen, colIndexes, denseEst)); |
| List<Future<ColGroup>> rtask = pool.invokeAll(tasks); |
| ArrayList<ColGroup> ret = new ArrayList<>(); |
| for( Future<ColGroup> lrtask : rtask ) |
| ret.add(lrtask.get()); |
| pool.shutdown(); |
| return ret.toArray(new ColGroup[0]); |
| } |
| catch(Exception ex) { |
| throw new DMLRuntimeException(ex); |
| } |
| } |
| |
| private static ColGroup compressColGroup(MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, int[] colIndexes, boolean denseEst) |
| { |
| int[] allGroupIndices = null; |
| int allColsCount = colIndexes.length; |
| CompressedSizeInfo sizeInfo; |
| // The compression type is decided based on a full bitmap since it |
| // will be reused for the actual compression step. |
| UncompressedBitmap ubm = null; |
| PriorityQueue<CompressedColumn> compRatioPQ = null; |
| boolean skipGroup = false; |
| while (true) |
| { |
| //exact big list and observe compression ratio |
| ubm = BitmapEncoder.extractBitmap(colIndexes, in); |
| sizeInfo = estim.estimateCompressedColGroupSize(ubm); |
| double sp2 = denseEst ? 1.0 : OptimizerUtils.getSparsity(rlen, 1, ubm.getNumOffsets()); |
| double compRatio = getUncompressedSize(rlen, colIndexes.length, sp2) / sizeInfo.getMinSize(); |
| |
| if( compRatio > 1 ) { |
| break; // we have a good group |
| } |
| |
| // modify the group |
| if (compRatioPQ == null) { |
| // first modification |
| allGroupIndices = colIndexes.clone(); |
| compRatioPQ = new PriorityQueue<>(); |
| for (int i = 0; i < colIndexes.length; i++) |
| compRatioPQ.add(new CompressedColumn(i, compRatios.get(colIndexes[i]))); |
| } |
| |
| // index in allGroupIndices |
| int removeIx = compRatioPQ.poll().colIx; |
| allGroupIndices[removeIx] = -1; |
| allColsCount--; |
| if (allColsCount == 0) { |
| skipGroup = true; |
| break; |
| } |
| colIndexes = new int[allColsCount]; |
| // copying the values that do not equal -1 |
| int ix = 0; |
| for(int col : allGroupIndices) |
| if (col != -1) |
| colIndexes[ix++] = col; |
| } |
| |
| //add group to uncompressed fallback |
| if( skipGroup ) |
| return null; |
| |
| //create compressed column group |
| long rleSize = sizeInfo.getRLESize(); |
| long oleSize = sizeInfo.getOLESize(); |
| long ddcSize = sizeInfo.getDDCSize(); |
| |
| if( ALLOW_DDC_ENCODING && ddcSize < rleSize && ddcSize < oleSize ) { |
| if( ubm.getNumValues()<=255 ) |
| return new ColGroupDDC1(colIndexes, rlen, ubm); |
| else |
| return new ColGroupDDC2(colIndexes, rlen, ubm); |
| } |
| else if( rleSize < oleSize ) |
| return new ColGroupRLE(colIndexes, rlen, ubm); |
| else |
| return new ColGroupOLE(colIndexes, rlen, ubm); |
| } |
| |
| /** |
| * Compute a conservative estimate of the uncompressed size of a column group. |
| * |
| * @param rlen row length |
| * @param clen column length |
| * @param sparsity the sparsity |
| * @return estimate of uncompressed size of column group |
| */ |
| private static double getUncompressedSize(int rlen, int clen, double sparsity) { |
| //we estimate the uncompressed size as the minimum of dense representation |
| //and representation in csr, which moderately overestimates sparse representations |
| //of single columns but helps avoid anomalies with sparse columns that are |
| //eventually represented in dense |
| return Math.min(8d * rlen * clen, 4d * rlen + 12d * rlen * clen * sparsity); |
| } |
| |
| private static double[] createSharedDDC1Dictionary(ArrayList<ColGroup> colGroups) { |
| if( !ALLOW_DDC_ENCODING || !ALLOW_SHARED_DDC1_DICTIONARY ) |
| return null; |
| |
| //create joint dictionary |
| HashSet<Double> tmp = new HashSet<>(); |
| int numQual = 0; |
| for( ColGroup grp : colGroups ) |
| if( grp.getNumCols()==1 && grp instanceof ColGroupDDC1 ) { |
| ColGroupDDC1 grpDDC1 = (ColGroupDDC1) grp; |
| for( double val : grpDDC1.getValues() ) |
| tmp.add(val); |
| numQual ++; |
| } |
| |
| //abort shared dictionary creation if empty or too large |
| int maxSize = tmp.contains(0d) ? 256 : 255; |
| if( tmp.isEmpty() || tmp.size() > maxSize || numQual < 2 ) |
| return null; |
| LOG.debug("Created shared directionary for " |
| + numQual+" DDC1 single column groups."); |
| |
| //build consolidated dictionary |
| return tmp.stream().mapToDouble(Double::doubleValue).toArray(); |
| } |
| |
| private static void applySharedDDC1Dictionary(ArrayList<ColGroup> colGroups, double[] dict) { |
| //create joint mapping table |
| HashMap<Double, Integer> map = new HashMap<>(); |
| for(int i=0; i<dict.length; i++) |
| map.put(dict[i], i); |
| |
| //recode data of all relevant DDC1 groups |
| for( ColGroup grp : colGroups ) |
| if( grp.getNumCols()==1 && grp instanceof ColGroupDDC1 ) { |
| ColGroupDDC1 grpDDC1 = (ColGroupDDC1) grp; |
| grpDDC1.recodeData(map); |
| grpDDC1.setValues(dict); |
| } |
| } |
| |
| /** |
| * Decompress block. |
| * |
| * @return a new uncompressed matrix block containing the contents of this |
| * block |
| */ |
| public MatrixBlock decompress() { |
| //early abort for not yet compressed blocks |
| if( !isCompressed() ) |
| return new MatrixBlock(this); |
| |
| Timing time = new Timing(true); |
| |
| //preallocation sparse rows to avoid repeated reallocations |
| MatrixBlock ret = new MatrixBlock(getNumRows(), getNumColumns(), isInSparseFormat(), getNonZeros()); |
| if( ret.isInSparseFormat() ) { |
| int[] rnnz = new int[rlen]; |
| for (ColGroup grp : _colGroups) |
| grp.countNonZerosPerRow(rnnz, 0, rlen); |
| ret.allocateSparseRowsBlock(); |
| SparseBlock rows = ret.getSparseBlock(); |
| for( int i=0; i<rlen; i++ ) |
| rows.allocate(i, rnnz[i]); |
| } |
| |
| //core decompression (append if sparse) |
| for (ColGroup grp : _colGroups) |
| grp.decompressToBlock(ret, 0, rlen); |
| |
| //post-processing (for append in decompress) |
| ret.setNonZeros(nonZeros); |
| if( ret.isInSparseFormat() ) |
| ret.sortSparseRows(); |
| |
| if( LOG.isDebugEnabled() ) |
| LOG.debug("decompressed block in "+time.stop()+"ms."); |
| |
| return ret; |
| } |
| |
| /** |
| * Decompress block. |
| * |
| * @param k degree of parallelism |
| * @return a new uncompressed matrix block containing the contents |
| * of this block |
| */ |
| public MatrixBlock decompress(int k) { |
| //early abort for not yet compressed blocks |
| if( !isCompressed() ) |
| return new MatrixBlock(this); |
| if( k <= 1 ) |
| return decompress(); |
| |
| Timing time = LOG.isDebugEnabled() ? new Timing(true) : null; |
| |
| MatrixBlock ret = new MatrixBlock(rlen, clen, sparse, nonZeros).allocateBlock(); |
| |
| //multi-threaded decompression |
| try { |
| ExecutorService pool = CommonThreadPool.get(k); |
| int rlen = getNumRows(); |
| int blklen = BitmapEncoder.getAlignedBlocksize( |
| (int)(Math.ceil((double)rlen/k))); |
| ArrayList<DecompressTask> tasks = new ArrayList<>(); |
| for( int i=0; i<k & i*blklen<getNumRows(); i++ ) |
| tasks.add(new DecompressTask(_colGroups, ret, i*blklen, Math.min((i+1)*blklen,rlen))); |
| List<Future<Object>> rtasks = pool.invokeAll(tasks); |
| pool.shutdown(); |
| for( Future<Object> rt : rtasks ) |
| rt.get(); //error handling |
| } |
| catch(Exception ex) { |
| throw new DMLRuntimeException(ex); |
| } |
| |
| //post-processing |
| ret.setNonZeros(nonZeros); |
| |
| if( LOG.isDebugEnabled() ) |
| LOG.debug("decompressed block w/ k="+k+" in "+time.stop()+"ms."); |
| |
| return ret; |
| } |
| |
| /** |
| * Obtain an upper bound on the memory used to store the compressed block. |
| * |
| * @return an upper bound on the memory used to store this compressed block |
| * considering class overhead. |
| */ |
| public long estimateCompressedSizeInMemory() { |
| if (!isCompressed()) |
| return 0; |
| // basic data inherited from MatrixBlock |
| long total = MatrixBlock.estimateSizeInMemory(0, 0, 0); |
| // adding the size of colGroups ArrayList overhead |
| // object overhead (32B) + int size (4B) + int modCount (4B) + Object[] |
| // elementData overhead + reference (32+8)B +reference ofr each Object (8B) |
| total += 80 + 8 * _colGroups.size(); |
| for (ColGroup grp : _colGroups) |
| total += grp.estimateInMemorySize(); |
| //correction for shared DDC1 dictionary |
| if( _sharedDDC1Dict ) { |
| boolean seenDDC1 = false; |
| for (ColGroup grp : _colGroups) |
| if( grp.getNumCols()==1 && grp instanceof ColGroupDDC1 ) { |
| if( seenDDC1 ) |
| total -= ((ColGroupDDC1)grp).getValuesSize(); |
| seenDDC1 = true; |
| } |
| } |
| return total; |
| } |
| |
| private static class CompressedColumn implements Comparable<CompressedColumn> { |
| final int colIx; |
| final double compRatio; |
| |
| public CompressedColumn(int colIx, double compRatio) { |
| this.colIx = colIx; |
| this.compRatio = compRatio; |
| } |
| |
| @Override |
| public int compareTo(CompressedColumn o) { |
| return (int) Math.signum(compRatio - o.compRatio); |
| } |
| } |
| |
| public static class CompressionStatistics { |
| public double timePhase1 = -1; |
| public double timePhase2 = -1; |
| public double timePhase3 = -1; |
| public double timePhase4 = -1; |
| public double timePhase5 = -1; |
| public double estSize = -1; |
| public double size = -1; |
| public double ratio = -1; |
| |
| public CompressionStatistics() { |
| //do nothing |
| } |
| |
| public CompressionStatistics(double t1, double t2, double t3, double t4, double t5){ |
| timePhase1 = t1; |
| timePhase2 = t2; |
| timePhase3 = t3; |
| timePhase4 = t4; |
| timePhase5 = t5; |
| } |
| } |
| |
| @Override |
| public double quickGetValue(int r, int c) { |
| if( !isCompressed() ) { |
| return super.quickGetValue(r, c); |
| } |
| |
| //find column group according to col index |
| ColGroup grp = null; |
| for( ColGroup group : _colGroups ) |
| if( Arrays.binarySearch(group.getColIndices(), c) >= 0 ) { |
| grp = group; break; |
| } |
| |
| //find row value |
| return grp.get(r, c); |
| } |
| |
| ////////////////////////////////////////// |
| // Serialization / Deserialization |
| |
| @Override |
| public long getExactSizeOnDisk() { |
| //header information |
| long ret = 22; |
| for( ColGroup grp : _colGroups ) { |
| ret += 1; //type info |
| ret += grp.getExactSizeOnDisk(); |
| } |
| return ret; |
| } |
| |
| @Override |
| public boolean isShallowSerialize() { |
| return false; |
| } |
| |
| @Override |
| public boolean isShallowSerialize(boolean inclConvert) { |
| return false; |
| } |
| |
| @Override |
| public void toShallowSerializeBlock() { |
| //do nothing |
| } |
| |
| @Override |
| public void readFields(DataInput in) |
| throws IOException |
| { |
| boolean compressed = in.readBoolean(); |
| |
| //deserialize uncompressed block |
| if( !compressed ) { |
| super.readFields(in); |
| return; |
| } |
| |
| //deserialize compressed block |
| rlen = in.readInt(); |
| clen = in.readInt(); |
| nonZeros = in.readLong(); |
| _sharedDDC1Dict = in.readBoolean(); |
| int ncolGroups = in.readInt(); |
| |
| _colGroups = new ArrayList<>(ncolGroups); |
| double[] sharedDict = null; |
| for( int i=0; i<ncolGroups; i++ ) |
| { |
| CompressionType ctype = CompressionType.values()[in.readByte()]; |
| ColGroup grp = null; |
| |
| //create instance of column group |
| switch( ctype ) { |
| case UNCOMPRESSED: |
| grp = new ColGroupUncompressed(); break; |
| case OLE_BITMAP: |
| grp = new ColGroupOLE(); break; |
| case RLE_BITMAP: |
| grp = new ColGroupRLE(); break; |
| case DDC1: |
| grp = new ColGroupDDC1(); break; |
| case DDC2: |
| grp = new ColGroupDDC2(); break; |
| } |
| |
| //deserialize and add column group (flag for shared dictionary passed |
| //and numCols evaluated in DDC1 because numCols not available yet |
| grp.readFields(in, sharedDict!=null); |
| |
| //use shared DDC1 dictionary if applicable |
| if( _sharedDDC1Dict && grp.getNumCols()==1 |
| && grp instanceof ColGroupDDC1 ) { |
| if( sharedDict == null ) |
| sharedDict = ((ColGroupValue)grp).getValues(); |
| else |
| ((ColGroupValue)grp).setValues(sharedDict); |
| } |
| |
| _colGroups.add(grp); |
| } |
| } |
| |
| @Override |
| public void write(DataOutput out) |
| throws IOException |
| { |
| out.writeBoolean( isCompressed() ); |
| |
| //serialize uncompressed block |
| if( !isCompressed() ) { |
| super.write(out); |
| return; |
| } |
| |
| //serialize compressed matrix block |
| out.writeInt(rlen); |
| out.writeInt(clen); |
| out.writeLong(nonZeros); |
| out.writeBoolean(_sharedDDC1Dict); |
| out.writeInt(_colGroups.size()); |
| |
| boolean skipDict = false; |
| for( ColGroup grp : _colGroups ) { |
| boolean shared = (grp instanceof ColGroupDDC1 |
| && _sharedDDC1Dict && grp.getNumCols()==1); |
| out.writeByte( grp.getCompType().ordinal() ); |
| grp.write(out, skipDict & shared); //delegate serialization |
| skipDict |= shared; |
| } |
| } |
| |
| |
| /** |
| * Redirects the default java serialization via externalizable to our default |
| * hadoop writable serialization for efficient broadcast/rdd deserialization. |
| * |
| * @param is object input |
| * @throws IOException if IOException occurs |
| */ |
| @Override |
| public void readExternal(ObjectInput is) |
| throws IOException |
| { |
| readFields(is); |
| } |
| |
| /** |
| * Redirects the default java serialization via externalizable to our default |
| * hadoop writable serialization for efficient broadcast/rdd serialization. |
| * |
| * @param os object output |
| * @throws IOException if IOException occurs |
| */ |
| @Override |
| public void writeExternal(ObjectOutput os) |
| throws IOException |
| { |
| write(os); |
| } |
| |
| public Iterator<IJV> getIterator(int rl, int ru, boolean inclZeros) { |
| return getIterator(rl, ru, 0, getNumColGroups(), inclZeros); |
| } |
| |
| public Iterator<IJV> getIterator(int rl, int ru, int cgl, int cgu, boolean inclZeros) { |
| return new ColumnGroupIterator(rl, ru, cgl, cgu, inclZeros); |
| } |
| |
| public Iterator<double[]> getDenseRowIterator(int rl, int ru) { |
| return new DenseRowIterator(rl, ru); |
| } |
| |
| public Iterator<SparseRow> getSparseRowIterator(int rl, int ru) { |
| return new SparseRowIterator(rl, ru); |
| } |
| |
| public int[] countNonZerosPerRow(int rl, int ru) { |
| int[] rnnz = new int[ru-rl]; |
| for (ColGroup grp : _colGroups) |
| grp.countNonZerosPerRow(rnnz, rl, ru); |
| return rnnz; |
| } |
| |
| ////////////////////////////////////////// |
| // Operations (overwrite existing ops for seamless integration) |
| |
| @Override |
| public MatrixValue scalarOperations(ScalarOperator sop, MatrixValue result) |
| { |
| //call uncompressed matrix scalar if necessary |
| if( !isCompressed() ) { |
| return super.scalarOperations(sop, result); |
| } |
| |
| //allocate the output matrix block |
| CompressedMatrixBlock ret = null; |
| if( result==null || !(result instanceof CompressedMatrixBlock) ) |
| ret = new CompressedMatrixBlock(getNumRows(), getNumColumns(), sparse); |
| else { |
| ret = (CompressedMatrixBlock) result; |
| ret.reset(rlen, clen); |
| } |
| |
| // Apply the operation recursively to each of the column groups. |
| // Most implementations will only modify metadata. |
| ArrayList<ColGroup> newColGroups = new ArrayList<>(); |
| for (ColGroup grp : _colGroups) { |
| newColGroups.add(grp.scalarOperation(sop)); |
| } |
| ret._colGroups = newColGroups; |
| ret.setNonZeros(rlen*clen); |
| |
| return ret; |
| } |
| |
| @Override |
| public MatrixBlock append(MatrixBlock that, MatrixBlock ret) |
| { |
| //call uncompressed matrix append if necessary |
| if( !isCompressed() ) { |
| if( that instanceof CompressedMatrixBlock ) |
| that = ((CompressedMatrixBlock) that).decompress(); |
| return super.append(that, ret, true); |
| } |
| |
| final int m = rlen; |
| final int n = clen+that.getNumColumns(); |
| final long nnz = nonZeros+that.getNonZeros(); |
| |
| //init result matrix |
| CompressedMatrixBlock ret2 = null; |
| if( ret == null || !(ret instanceof CompressedMatrixBlock) ) { |
| ret2 = new CompressedMatrixBlock(m, n, isInSparseFormat()); |
| } |
| else { |
| ret2 = (CompressedMatrixBlock) ret; |
| ret2.reset(m, n); |
| } |
| |
| //shallow copy of lhs column groups |
| ret2.allocateColGroupList(); |
| ret2._colGroups.addAll(_colGroups); |
| |
| //copy of rhs column groups w/ col index shifting |
| if( !(that instanceof CompressedMatrixBlock) ) { |
| that = new CompressedMatrixBlock(that); |
| ((CompressedMatrixBlock)that).compress(); |
| } |
| ArrayList<ColGroup> inColGroups = ((CompressedMatrixBlock) that)._colGroups; |
| for( ColGroup group : inColGroups ) { |
| ColGroup tmp = ConverterUtils.copyColGroup(group); |
| tmp.shiftColIndices(clen); |
| ret2._colGroups.add(tmp); |
| } |
| |
| //meta data maintenance |
| ret2.setNonZeros(nnz); |
| return ret2; |
| } |
| |
| @Override |
| public MatrixBlock chainMatrixMultOperations(MatrixBlock v, MatrixBlock w, MatrixBlock out, ChainType ctype) |
| { |
| //call uncompressed matrix mult if necessary |
| if( !isCompressed() ) { |
| return super.chainMatrixMultOperations(v, w, out, ctype); |
| } |
| |
| //single-threaded mmchain of single uncompressed colgroup |
| if( isSingleUncompressedGroup() ){ |
| return ((ColGroupUncompressed)_colGroups.get(0)) |
| .getData().chainMatrixMultOperations(v, w, out, ctype); |
| } |
| |
| //Timing time = new Timing(true); |
| |
| //prepare result |
| if( out != null ) |
| out.reset(clen, 1, false); |
| else |
| out = new MatrixBlock(clen, 1, false); |
| |
| //empty block handling |
| if( isEmptyBlock(false) ) |
| return out; |
| |
| //compute matrix mult |
| MatrixBlock tmp = new MatrixBlock(rlen, 1, false); |
| rightMultByVector(v, tmp); |
| if( ctype == ChainType.XtwXv ) { |
| BinaryOperator bop = new BinaryOperator(Multiply.getMultiplyFnObject()); |
| LibMatrixBincell.bincellOpInPlace(tmp, w, bop); |
| } |
| leftMultByVectorTranspose(_colGroups, tmp, out, true, true); |
| |
| //System.out.println("Compressed MMChain in "+time.stop()); |
| |
| return out; |
| } |
| |
| @Override |
| public MatrixBlock chainMatrixMultOperations(MatrixBlock v, MatrixBlock w, MatrixBlock out, ChainType ctype, int k) |
| { |
| //call uncompressed matrix mult if necessary |
| if( !isCompressed() ){ |
| return super.chainMatrixMultOperations(v, w, out, ctype, k); |
| } |
| |
| //multi-threaded mmchain of single uncompressed colgroup |
| if( isSingleUncompressedGroup() ){ |
| return ((ColGroupUncompressed)_colGroups.get(0)) |
| .getData().chainMatrixMultOperations(v, w, out, ctype, k); |
| } |
| |
| Timing time = LOG.isDebugEnabled() ? new Timing(true) : null; |
| |
| //prepare result |
| if( out != null ) |
| out.reset(clen, 1, false); |
| else |
| out = new MatrixBlock(clen, 1, false); |
| |
| //empty block handling |
| if( isEmptyBlock(false) ) |
| return out; |
| |
| //compute matrix mult |
| MatrixBlock tmp = new MatrixBlock(rlen, 1, false); |
| rightMultByVector(v, tmp, k); |
| if( ctype == ChainType.XtwXv ) { |
| BinaryOperator bop = new BinaryOperator(Multiply.getMultiplyFnObject()); |
| LibMatrixBincell.bincellOpInPlace(tmp, w, bop); |
| } |
| leftMultByVectorTranspose(_colGroups, tmp, out, true, k); |
| |
| if( LOG.isDebugEnabled() ) |
| LOG.debug("Compressed MMChain k="+k+" in "+time.stop()); |
| |
| return out; |
| } |
| |
| @Override |
| public MatrixBlock aggregateBinaryOperations(MatrixBlock m1, MatrixBlock m2, MatrixBlock ret, AggregateBinaryOperator op) |
| { |
| //call uncompressed matrix mult if necessary |
| if( !isCompressed() ) { |
| return super.aggregateBinaryOperations(m1, m2, ret, op); |
| } |
| |
| //multi-threaded mm of single uncompressed colgroup |
| if( isSingleUncompressedGroup() ){ |
| MatrixBlock tmp = ((ColGroupUncompressed)_colGroups.get(0)).getData(); |
| return tmp.aggregateBinaryOperations(this==m1?tmp:m1, this==m2?tmp:m2, ret, op); |
| } |
| |
| Timing time = LOG.isDebugEnabled() ? new Timing(true) : null; |
| |
| //setup meta data (dimensions, sparsity) |
| int rl = m1.getNumRows(); |
| int cl = m2.getNumColumns(); |
| |
| //create output matrix block |
| if( ret==null ) |
| ret = new MatrixBlock(rl, cl, false, rl*cl); |
| else |
| ret.reset(rl, cl, false, rl*cl); |
| |
| //compute matrix mult |
| if( m1.getNumRows()>1 && m2.getNumColumns()==1 ) { //MV right |
| CompressedMatrixBlock cmb = (CompressedMatrixBlock)m1; |
| if( op.getNumThreads()>1 ) |
| cmb.rightMultByVector(m2, ret, op.getNumThreads()); |
| else |
| cmb.rightMultByVector(m2, ret); |
| } |
| else if( m1.getNumRows()==1 && m2.getNumColumns()>1 ) { //MV left |
| if( op.getNumThreads()>1 ) |
| leftMultByVectorTranspose(_colGroups, m1, ret, false, op.getNumThreads()); |
| else |
| leftMultByVectorTranspose(_colGroups, m1, ret, false, true); |
| } |
| else { //MM |
| //prepare the other input (including decompression if necessary) |
| boolean right = (m1 == this); |
| MatrixBlock that = right ? m2 : m1; |
| if( that instanceof CompressedMatrixBlock ) { |
| that = ((CompressedMatrixBlock)that).isCompressed() ? |
| ((CompressedMatrixBlock)that).decompress() : that; |
| } |
| |
| //transpose for sequential repeated column access |
| if( right ) { |
| that = LibMatrixReorg.transpose(that, new MatrixBlock(that.getNumColumns(), |
| that.getNumRows(), that.isInSparseFormat()), op.getNumThreads()); |
| } |
| |
| MatrixBlock tmpIn = new MatrixBlock(1, that.getNumColumns(), false).allocateBlock(); |
| MatrixBlock tmpOut = new MatrixBlock(right?rl:1, right?1:cl, false).allocateBlock(); |
| if( right ) { //MM right |
| for(int i=0; i<that.getNumRows(); i++) { //on transpose |
| tmpIn = that.slice(i, i, 0, that.getNumColumns()-1, tmpIn); |
| MatrixBlock tmpIn2 = LibMatrixReorg.transpose(tmpIn, //meta data op |
| new MatrixBlock(tmpIn.getNumColumns(), tmpIn.getNumRows(), false)); |
| tmpOut.reset(tmpOut.getNumRows(), tmpOut.getNumColumns()); |
| if( op.getNumThreads()>1 ) |
| rightMultByVector(tmpIn2, tmpOut, op.getNumThreads()); |
| else |
| rightMultByVector(tmpIn2, tmpOut); |
| ret.leftIndexingOperations(tmpOut, 0, ret.getNumRows()-1, i, i, ret, UpdateType.INPLACE); |
| } |
| } |
| else { // MM left |
| for(int i=0; i<that.getNumRows(); i++) { |
| tmpIn = that.slice(i, i, 0, that.getNumColumns()-1, tmpIn); |
| if( op.getNumThreads()>1 ) |
| leftMultByVectorTranspose(_colGroups, tmpIn, tmpOut, false, op.getNumThreads()); |
| else |
| leftMultByVectorTranspose(_colGroups, tmpIn, tmpOut, false, true); |
| ret.leftIndexingOperations(tmpOut, i, i, 0, ret.getNumColumns()-1, ret, UpdateType.INPLACE); |
| } |
| } |
| } |
| |
| if( LOG.isDebugEnabled() ) |
| LOG.debug("Compressed MM in "+time.stop()); |
| |
| return ret; |
| } |
| |
| @Override |
| public MatrixValue aggregateUnaryOperations(AggregateUnaryOperator op, MatrixValue result, |
| int blockingFactorRow, int blockingFactorCol, MatrixIndexes indexesIn, boolean inCP) |
| { |
| //call uncompressed matrix mult if necessary |
| if( !isCompressed() ) { |
| return super.aggregateUnaryOperations(op, result, blockingFactorRow, blockingFactorCol, indexesIn, inCP); |
| } |
| |
| //check for supported operations |
| if( !(op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof KahanPlusSq |
| || (op.aggOp.increOp.fn instanceof Builtin && |
| (((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MIN |
| ||((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MAX))) ){ |
| throw new DMLRuntimeException("Unary aggregates other than sum/sumsq/min/max not supported yet."); |
| } |
| |
| Timing time = LOG.isDebugEnabled() ? new Timing(true) : null; |
| |
| //prepare output dimensions |
| CellIndex tempCellIndex = new CellIndex(-1,-1); |
| op.indexFn.computeDimension(rlen, clen, tempCellIndex); |
| if(op.aggOp.correctionExists) { |
| switch(op.aggOp.correctionLocation) |
| { |
| case LASTROW: tempCellIndex.row++; break; |
| case LASTCOLUMN: tempCellIndex.column++; break; |
| case LASTTWOROWS: tempCellIndex.row+=2; break; |
| case LASTTWOCOLUMNS: tempCellIndex.column+=2; break; |
| default: |
| throw new DMLRuntimeException("unrecognized correctionLocation: "+op.aggOp.correctionLocation); |
| } |
| } |
| |
| // initialize and allocate the result |
| if(result==null) |
| result=new MatrixBlock(tempCellIndex.row, tempCellIndex.column, false); |
| else |
| result.reset(tempCellIndex.row, tempCellIndex.column, false); |
| MatrixBlock ret = (MatrixBlock) result; |
| ret.allocateDenseBlock(); |
| |
| //special handling init value for rowmins/rowmax |
| if( op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn instanceof Builtin ) { |
| double val = (((Builtin)op.aggOp.increOp.fn).getBuiltinCode()==BuiltinCode.MAX) ? |
| Double.NEGATIVE_INFINITY : Double.POSITIVE_INFINITY; |
| ret.getDenseBlock().set(val); |
| } |
| |
| //core unary aggregate |
| if( op.getNumThreads() > 1 |
| && getExactSizeOnDisk() > MIN_PAR_AGG_THRESHOLD ) |
| { |
| //multi-threaded execution of all groups |
| ArrayList<ColGroup>[] grpParts = createStaticTaskPartitioning( |
| (op.indexFn instanceof ReduceCol) ? 1 : op.getNumThreads(), false); |
| ColGroupUncompressed uc = getUncompressedColGroup(); |
| try { |
| //compute uncompressed column group in parallel (otherwise bottleneck) |
| if( uc != null ) |
| uc.unaryAggregateOperations(op, ret); |
| //compute all compressed column groups |
| ExecutorService pool = CommonThreadPool.get(op.getNumThreads()); |
| ArrayList<UnaryAggregateTask> tasks = new ArrayList<>(); |
| if( op.indexFn instanceof ReduceCol && grpParts.length > 0 ) { |
| int blklen = BitmapEncoder.getAlignedBlocksize( |
| (int)(Math.ceil((double)rlen/op.getNumThreads()))); |
| for( int i=0; i<op.getNumThreads() & i*blklen<rlen; i++ ) |
| tasks.add(new UnaryAggregateTask(grpParts[0], ret, i*blklen, Math.min((i+1)*blklen,rlen), op)); |
| } |
| else |
| for( ArrayList<ColGroup> grp : grpParts ) |
| tasks.add(new UnaryAggregateTask(grp, ret, 0, rlen, op)); |
| List<Future<MatrixBlock>> rtasks = pool.invokeAll(tasks); |
| pool.shutdown(); |
| |
| //aggregate partial results |
| if( op.indexFn instanceof ReduceAll ) { |
| if( op.aggOp.increOp.fn instanceof KahanFunction ) { |
| KahanObject kbuff = new KahanObject(ret.quickGetValue(0, 0), 0); |
| for( Future<MatrixBlock> rtask : rtasks ) { |
| double tmp = rtask.get().quickGetValue(0, 0); |
| ((KahanFunction) op.aggOp.increOp.fn).execute2(kbuff, tmp); |
| } |
| ret.quickSetValue(0, 0, kbuff._sum); |
| } |
| else { |
| double val = ret.quickGetValue(0, 0); |
| for( Future<MatrixBlock> rtask : rtasks ) { |
| double tmp = rtask.get().quickGetValue(0, 0); |
| val = op.aggOp.increOp.fn.execute(val, tmp); |
| } |
| ret.quickSetValue(0, 0, val); |
| } |
| } |
| } |
| catch(Exception ex) { |
| throw new DMLRuntimeException(ex); |
| } |
| } |
| else { |
| //process UC column group |
| for (ColGroup grp : _colGroups) |
| if( grp instanceof ColGroupUncompressed ) |
| grp.unaryAggregateOperations(op, ret); |
| |
| //process OLE/RLE column groups |
| aggregateUnaryOperations(op, _colGroups, ret, 0, rlen); |
| } |
| |
| //special handling zeros for rowmins/rowmax |
| if( op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn instanceof Builtin ) { |
| int[] rnnz = new int[rlen]; |
| for( ColGroup grp : _colGroups ) |
| grp.countNonZerosPerRow(rnnz, 0, rlen); |
| Builtin builtin = (Builtin)op.aggOp.increOp.fn; |
| for( int i=0; i<rlen; i++ ) |
| if( rnnz[i] < clen ) |
| ret.quickSetValue(i, 0, builtin.execute(ret.quickGetValue(i, 0), 0)); |
| } |
| |
| //drop correction if necessary |
| if(op.aggOp.correctionExists && inCP) |
| ret.dropLastRowsOrColumns(op.aggOp.correctionLocation); |
| |
| //post-processing |
| ret.recomputeNonZeros(); |
| |
| if( LOG.isDebugEnabled() ) |
| LOG.debug("Compressed uagg k="+op.getNumThreads()+" in "+time.stop()); |
| |
| |
| return ret; |
| } |
| |
| @Override |
| public MatrixValue aggregateUnaryOperations(AggregateUnaryOperator op, |
| MatrixValue result, int blockingFactorRow, int blockingFactorCol, |
| MatrixIndexes indexesIn) { |
| return aggregateUnaryOperations(op, result, |
| blockingFactorRow, blockingFactorCol, indexesIn, false); |
| } |
| |
| private static void aggregateUnaryOperations(AggregateUnaryOperator op, |
| ArrayList<ColGroup> groups, MatrixBlock ret, int rl, int ru) |
| { |
| boolean cacheDDC1 = ColGroupValue.LOW_LEVEL_OPT |
| && op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn instanceof KahanPlus //rowSums |
| && ColGroupOffset.ALLOW_CACHE_CONSCIOUS_ROWSUMS |
| && ru-rl > ColGroupOffset.WRITE_CACHE_BLKSZ/2; |
| |
| //process cache-conscious DDC1 groups (adds to output) |
| if( cacheDDC1 ) { |
| ArrayList<ColGroupDDC1> tmp = new ArrayList<>(); |
| for( ColGroup grp : groups ) |
| if( grp instanceof ColGroupDDC1 ) |
| tmp.add((ColGroupDDC1)grp); |
| if( !tmp.isEmpty() ) |
| ColGroupDDC1.computeRowSums(tmp.toArray(new ColGroupDDC1[0]), ret, |
| KahanPlus.getKahanPlusFnObject(), rl, ru); |
| } |
| |
| //process remaining groups (adds to output) |
| //note: UC group never passed into this function |
| for( ColGroup grp : groups ) |
| if( !(grp instanceof ColGroupUncompressed) |
| && !(cacheDDC1 && grp instanceof ColGroupDDC1) ) |
| ((ColGroupValue)grp).unaryAggregateOperations(op, ret, rl, ru); |
| } |
| |
| @Override |
| public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype) |
| { |
| //call uncompressed matrix mult if necessary |
| if( !isCompressed() ) { |
| return super.transposeSelfMatrixMultOperations(out, tstype); |
| } |
| |
| //single-threaded tsmm of single uncompressed colgroup |
| if( isSingleUncompressedGroup() ){ |
| return ((ColGroupUncompressed)_colGroups.get(0)) |
| .getData().transposeSelfMatrixMultOperations(out, tstype); |
| } |
| |
| Timing time = LOG.isDebugEnabled() ? new Timing(true) : null; |
| |
| //check for transpose type |
| if( tstype != MMTSJType.LEFT ) //right not supported yet |
| throw new DMLRuntimeException("Invalid MMTSJ type '"+tstype.toString()+"'."); |
| |
| //create output matrix block |
| if( out == null ) |
| out = new MatrixBlock(clen, clen, false); |
| else |
| out.reset(clen, clen, false); |
| out.allocateDenseBlock(); |
| |
| if( !isEmptyBlock(false) ) { |
| //compute matrix mult |
| leftMultByTransposeSelf(_colGroups, out, 0, _colGroups.size()); |
| |
| // post-processing |
| out.setNonZeros(LinearAlgebraUtils |
| .copyUpperToLowerTriangle(out)); |
| } |
| |
| if( LOG.isDebugEnabled() ) |
| LOG.debug("Compressed TSMM in "+time.stop()); |
| |
| return out; |
| } |
| |
| |
| @Override |
| public MatrixBlock transposeSelfMatrixMultOperations(MatrixBlock out, MMTSJType tstype, int k) |
| { |
| //call uncompressed matrix mult if necessary |
| if( !isCompressed() ){ |
| return super.transposeSelfMatrixMultOperations(out, tstype, k); |
| } |
| |
| //multi-threaded tsmm of single uncompressed colgroup |
| if( isSingleUncompressedGroup() ){ |
| return ((ColGroupUncompressed)_colGroups.get(0)) |
| .getData().transposeSelfMatrixMultOperations(out, tstype, k); |
| } |
| |
| Timing time = LOG.isDebugEnabled() ? new Timing(true) : null; |
| |
| //check for transpose type |
| if( tstype != MMTSJType.LEFT ) //right not supported yet |
| throw new DMLRuntimeException("Invalid MMTSJ type '"+tstype.toString()+"'."); |
| |
| //create output matrix block |
| if( out == null ) |
| out = new MatrixBlock(clen, clen, false); |
| else |
| out.reset(clen, clen, false); |
| out.allocateDenseBlock(); |
| |
| if( !isEmptyBlock(false) ) { |
| //compute matrix mult |
| try { |
| ExecutorService pool = CommonThreadPool.get(k); |
| ArrayList<MatrixMultTransposeTask> tasks = new ArrayList<>(); |
| int numgrp = _colGroups.size(); |
| int blklen = (int)(Math.ceil((double)numgrp/(2*k))); |
| for( int i=0; i<2*k & i*blklen<clen; i++ ) |
| tasks.add(new MatrixMultTransposeTask(_colGroups, out, i*blklen, Math.min((i+1)*blklen, numgrp))); |
| List<Future<Object>> ret = pool.invokeAll(tasks); |
| for( Future<Object> tret : ret ) |
| tret.get(); //check for errors |
| pool.shutdown(); |
| } |
| catch(Exception ex) { |
| throw new DMLRuntimeException(ex); |
| } |
| |
| // post-processing |
| out.setNonZeros(LinearAlgebraUtils |
| .copyUpperToLowerTriangle(out)); |
| } |
| |
| if( LOG.isDebugEnabled() ) |
| LOG.debug("Compressed TSMM k="+k+" in "+time.stop()); |
| |
| return out; |
| } |
| |
| |
| /** |
| * Multiply this matrix block by a column vector on the right. |
| * |
| * @param vector |
| * right-hand operand of the multiplication |
| * @param result |
| * buffer to hold the result; must have the appropriate size |
| * already |
| */ |
| private void rightMultByVector(MatrixBlock vector, MatrixBlock result) |
| { |
| // initialize and allocate the result |
| result.allocateDenseBlock(); |
| |
| // delegate matrix-vector operation to each column group |
| rightMultByVector(_colGroups, vector, result, true, 0, result.getNumRows()); |
| |
| // post-processing |
| result.recomputeNonZeros(); |
| } |
| |
| /** |
| * Multi-threaded version of rightMultByVector. |
| * |
| * @param vector matrix block vector |
| * @param result matrix block result |
| * @param k number of threads |
| */ |
| private void rightMultByVector(MatrixBlock vector, MatrixBlock result, int k) |
| { |
| // initialize and allocate the result |
| result.allocateDenseBlock(); |
| |
| //multi-threaded execution of all groups |
| try { |
| ColGroupUncompressed uc = getUncompressedColGroup(); |
| |
| //compute uncompressed column group in parallel |
| if( uc != null ) |
| uc.rightMultByVector(vector, result, k); |
| |
| //compute remaining compressed column groups in parallel |
| ExecutorService pool = CommonThreadPool.get(k); |
| int rlen = getNumRows(); |
| int blklen = BitmapEncoder.getAlignedBlocksize( |
| (int)(Math.ceil((double)rlen/k))); |
| ArrayList<RightMatrixMultTask> tasks = new ArrayList<>(); |
| for( int i=0; i<k & i*blklen<getNumRows(); i++ ) |
| tasks.add(new RightMatrixMultTask(_colGroups, vector, result, i*blklen, Math.min((i+1)*blklen,rlen))); |
| List<Future<Long>> ret = pool.invokeAll(tasks); |
| pool.shutdown(); |
| |
| //error handling and nnz aggregation |
| long lnnz = 0; |
| for( Future<Long> tmp : ret ) |
| lnnz += tmp.get(); |
| result.setNonZeros(lnnz); |
| } |
| catch(Exception ex) { |
| throw new DMLRuntimeException(ex); |
| } |
| } |
| |
| private static void rightMultByVector(ArrayList<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, boolean inclUC, int rl, int ru) |
| { |
| ColGroupValue.setupThreadLocalMemory(getMaxNumValues(groups)); |
| |
| boolean cacheDDC1 = ColGroupValue.LOW_LEVEL_OPT |
| && ru-rl > ColGroupOffset.WRITE_CACHE_BLKSZ; |
| |
| // process uncompressed column group (overwrites output) |
| if( inclUC ) { |
| for( ColGroup grp : groups ) |
| if( grp instanceof ColGroupUncompressed ) |
| grp.rightMultByVector(vect, ret, rl, ru); |
| } |
| |
| //process cache-conscious DDC1 groups (adds to output) |
| if( cacheDDC1 ) { |
| ArrayList<ColGroupDDC1> tmp = new ArrayList<>(); |
| for( ColGroup grp : groups ) |
| if( grp instanceof ColGroupDDC1 ) |
| tmp.add((ColGroupDDC1)grp); |
| if( !tmp.isEmpty() ) |
| ColGroupDDC1.rightMultByVector(tmp.toArray(new ColGroupDDC1[0]), vect, ret, rl, ru); |
| } |
| |
| //process remaining groups (adds to output) |
| for( ColGroup grp : groups ) |
| if( !(grp instanceof ColGroupUncompressed) |
| && !(cacheDDC1 && grp instanceof ColGroupDDC1) ) |
| grp.rightMultByVector(vect, ret, rl, ru); |
| |
| ColGroupValue.cleanupThreadLocalMemory(); |
| } |
| |
| /** |
| * Multiply this matrix block by the transpose of a column vector (i.e. |
| * t(v)%*%X) |
| * |
| * @param colGroups list of column groups |
| * @param vector |
| * left-hand operand of the multiplication |
| * @param result |
| * buffer to hold the result; must have the appropriate size |
| * already |
| * @param doTranspose if true, transpose vector |
| */ |
| private static void leftMultByVectorTranspose(List<ColGroup> colGroups, MatrixBlock vector, MatrixBlock result, boolean doTranspose, boolean allocTmp) |
| { |
| //transpose vector if required |
| MatrixBlock rowVector = vector; |
| if (doTranspose) { |
| rowVector = new MatrixBlock(1, vector.getNumRows(), false); |
| LibMatrixReorg.transpose(vector, rowVector); |
| } |
| |
| // initialize and allocate the result |
| result.reset(); |
| result.allocateDenseBlock(); |
| |
| // setup memory pool for reuse |
| if( allocTmp ) |
| ColGroupValue.setupThreadLocalMemory(getMaxNumValues(colGroups)); |
| |
| // delegate matrix-vector operation to each column group |
| for (ColGroup grp : colGroups) { |
| grp.leftMultByRowVector(rowVector, result); |
| } |
| |
| // post-processing |
| if( allocTmp ) |
| ColGroupValue.cleanupThreadLocalMemory(); |
| result.recomputeNonZeros(); |
| } |
| |
| private static void leftMultByVectorTranspose(List<ColGroup> colGroups, ColGroupDDC vector, MatrixBlock result) { |
| // initialize and allocate the result |
| result.reset(); |
| // delegate matrix-vector operation to each column group |
| for( ColGroup grp : colGroups ) |
| ((ColGroupValue)grp).leftMultByRowVector(vector, result); |
| // post-processing |
| result.recomputeNonZeros(); |
| } |
| |
| /** |
| * Multi-thread version of leftMultByVectorTranspose. |
| * |
| * @param colGroups list of column groups |
| * @param vector |
| * left-hand operand of the multiplication |
| * @param result |
| * buffer to hold the result; must have the appropriate size |
| * already |
| * @param doTranspose if true, transpose vector |
| * @param k number of threads |
| */ |
| private void leftMultByVectorTranspose(List<ColGroup> colGroups,MatrixBlock vector, MatrixBlock result, boolean doTranspose, int k) |
| { |
| //transpose vector if required |
| MatrixBlock rowVector = vector; |
| if (doTranspose) { |
| rowVector = new MatrixBlock(1, vector.getNumRows(), false); |
| LibMatrixReorg.transpose(vector, rowVector); |
| } |
| |
| // initialize and allocate the result |
| result.reset(); |
| result.allocateDenseBlock(); |
| |
| //multi-threaded execution |
| try { |
| //compute uncompressed column group in parallel |
| ColGroupUncompressed uc = getUncompressedColGroup(); |
| if( uc != null ) |
| uc.leftMultByRowVector(vector, result, k); |
| |
| //compute remaining compressed column groups in parallel |
| ExecutorService pool = CommonThreadPool.get( Math.min(colGroups.size()-((uc!=null)?1:0), k) ); |
| ArrayList<ColGroup>[] grpParts = createStaticTaskPartitioning(4*k, false); |
| ArrayList<LeftMatrixMultTask> tasks = new ArrayList<>(); |
| for( ArrayList<ColGroup> groups : grpParts ) |
| tasks.add(new LeftMatrixMultTask(groups, rowVector, result)); |
| List<Future<Object>> ret = pool.invokeAll(tasks); |
| pool.shutdown(); |
| for( Future<Object> tmp : ret ) |
| tmp.get(); //error handling |
| } |
| catch(Exception ex) { |
| throw new DMLRuntimeException(ex); |
| } |
| |
| // post-processing |
| result.recomputeNonZeros(); |
| } |
| |
| private static void leftMultByTransposeSelf(ArrayList<ColGroup> groups, MatrixBlock result, int gl, int gu) |
| { |
| final int numRows = groups.get(0).getNumRows(); |
| final int numGroups = groups.size(); |
| final boolean containsUC = containsUncompressedColGroup(groups); |
| |
| //preallocated dense tmp matrix blocks |
| MatrixBlock lhs = new MatrixBlock(1, numRows, false); |
| MatrixBlock tmpret = new MatrixBlock(1, result.getNumColumns(), false); |
| lhs.allocateDenseBlock(); |
| tmpret.allocateDenseBlock(); |
| |
| // setup memory pool for reuse |
| ColGroupValue.setupThreadLocalMemory(getMaxNumValues(groups)); |
| |
| //approach: for each colgroup, extract uncompressed columns one at-a-time |
| //vector-matrix multiplies against remaining col groups |
| for( int i=gl; i<gu; i++ ) |
| { |
| //get current group and relevant col groups |
| ColGroup group = groups.get(i); |
| int[] ixgroup = group.getColIndices(); |
| List<ColGroup> tmpList = groups.subList(i, numGroups); |
| |
| if( group instanceof ColGroupDDC //single DDC group |
| && ixgroup.length==1 && !containsUC && numRows<BitmapEncoder.BITMAP_BLOCK_SZ ) |
| { |
| //compute vector-matrix partial result |
| leftMultByVectorTranspose(tmpList, (ColGroupDDC)group, tmpret); |
| |
| //write partial results (disjoint non-zeros) |
| LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret, ixgroup[0]); |
| } |
| else { |
| //for all uncompressed lhs columns vectors |
| for( int j=0; j<ixgroup.length; j++ ) { |
| group.decompressToBlock(lhs, j); |
| |
| if( !lhs.isEmptyBlock(false) ) { |
| //compute vector-matrix partial result |
| leftMultByVectorTranspose(tmpList, lhs, tmpret, false, false); |
| |
| //write partial results (disjoint non-zeros) |
| LinearAlgebraUtils.copyNonZerosToUpperTriangle(result, tmpret, ixgroup[j]); |
| } |
| } |
| } |
| } |
| |
| //post processing |
| ColGroupValue.cleanupThreadLocalMemory(); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private ArrayList<ColGroup>[] createStaticTaskPartitioning(int k, boolean inclUncompressed) |
| { |
| // special case: single uncompressed col group |
| if( _colGroups.size()==1 && _colGroups.get(0) instanceof ColGroupUncompressed ){ |
| return new ArrayList[0]; |
| } |
| |
| // initialize round robin col group distribution |
| // (static task partitioning to reduce mem requirements/final agg) |
| int numTasks = Math.min(k, _colGroups.size()); |
| ArrayList<ColGroup>[] grpParts = new ArrayList[numTasks]; |
| int pos = 0; |
| for( ColGroup grp : _colGroups ){ |
| if( grpParts[pos]==null ) |
| grpParts[pos] = new ArrayList<>(); |
| if( inclUncompressed || !(grp instanceof ColGroupUncompressed) ) { |
| grpParts[pos].add(grp); |
| pos = (pos==numTasks-1) ? 0 : pos+1; |
| } |
| } |
| |
| return grpParts; |
| } |
| |
| private static int getMaxNumValues(List<ColGroup> groups) { |
| int numVals = 1; |
| for( ColGroup grp : groups ) |
| if( grp instanceof ColGroupValue ) |
| numVals = Math.max(numVals, |
| ((ColGroupValue)grp).getNumValues()); |
| return numVals; |
| } |
| |
| public boolean hasUncompressedColGroup() { |
| return getUncompressedColGroup() != null; |
| } |
| |
| private ColGroupUncompressed getUncompressedColGroup() { |
| for( ColGroup grp : _colGroups ) |
| if( grp instanceof ColGroupUncompressed ) |
| return (ColGroupUncompressed)grp; |
| |
| return null; |
| } |
| |
| private static boolean containsUncompressedColGroup(ArrayList<ColGroup> groups) { |
| for( ColGroup grp : groups ) |
| if( grp instanceof ColGroupUncompressed ) |
| return true; |
| return false; |
| } |
| |
| private static class LeftMatrixMultTask implements Callable<Object> |
| { |
| private final ArrayList<ColGroup> _groups; |
| private final MatrixBlock _vect; |
| private final MatrixBlock _ret; |
| |
| protected LeftMatrixMultTask( ArrayList<ColGroup> groups, MatrixBlock vect, MatrixBlock ret) { |
| _groups = groups; |
| _vect = vect; |
| _ret = ret; |
| } |
| |
| @Override |
| public Object call() |
| { |
| // setup memory pool for reuse |
| ColGroupValue.setupThreadLocalMemory(getMaxNumValues(_groups)); |
| |
| // delegate matrix-vector operation to each column group |
| for(ColGroup grp : _groups) |
| grp.leftMultByRowVector(_vect, _ret); |
| |
| ColGroupValue.cleanupThreadLocalMemory(); |
| return null; |
| } |
| } |
| |
| private static class RightMatrixMultTask implements Callable<Long> |
| { |
| private final ArrayList<ColGroup> _groups; |
| private final MatrixBlock _vect; |
| private final MatrixBlock _ret; |
| private final int _rl; |
| private final int _ru; |
| |
| protected RightMatrixMultTask( ArrayList<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, int rl, int ru) { |
| _groups = groups; |
| _vect = vect; |
| _ret = ret; |
| _rl = rl; |
| _ru = ru; |
| } |
| |
| @Override |
| public Long call() { |
| rightMultByVector(_groups, _vect, _ret, false, _rl, _ru); |
| return _ret.recomputeNonZeros(_rl, _ru-1, 0, 0); |
| } |
| } |
| |
| private static class MatrixMultTransposeTask implements Callable<Object> |
| { |
| private final ArrayList<ColGroup> _groups; |
| private final MatrixBlock _ret; |
| private final int _gl; |
| private final int _gu; |
| |
| protected MatrixMultTransposeTask(ArrayList<ColGroup> groups, MatrixBlock ret, int gl, int gu) { |
| _groups = groups; |
| _ret = ret; |
| _gl = gl; |
| _gu = gu; |
| } |
| |
| @Override |
| public Object call() { |
| leftMultByTransposeSelf(_groups, _ret, _gl, _gu); |
| return null; |
| } |
| } |
| |
| private static class UnaryAggregateTask implements Callable<MatrixBlock> |
| { |
| private final ArrayList<ColGroup> _groups; |
| private final int _rl; |
| private final int _ru; |
| private final MatrixBlock _ret; |
| private final AggregateUnaryOperator _op; |
| |
| protected UnaryAggregateTask( ArrayList<ColGroup> groups, MatrixBlock ret, int rl, int ru, AggregateUnaryOperator op) { |
| _groups = groups; |
| _op = op; |
| _rl = rl; |
| _ru = ru; |
| |
| if( _op.indexFn instanceof ReduceAll ) { //sum |
| _ret = new MatrixBlock(ret.getNumRows(), ret.getNumColumns(), false); |
| _ret.allocateDenseBlock(); |
| if( _op.aggOp.increOp.fn instanceof Builtin ) |
| System.arraycopy(ret.getDenseBlockValues(), 0, |
| _ret.getDenseBlockValues(), 0, ret.getNumRows()*ret.getNumColumns()); |
| } |
| else { //colSums |
| _ret = ret; |
| } |
| } |
| |
| @Override |
| public MatrixBlock call() { |
| aggregateUnaryOperations(_op, _groups, _ret, _rl, _ru); |
| return _ret; |
| } |
| } |
| |
| private static class SizeEstimTask implements Callable<CompressedSizeInfo> |
| { |
| private final CompressedSizeEstimator _estim; |
| private final int _col; |
| |
| protected SizeEstimTask( CompressedSizeEstimator estim, int col ) { |
| _estim = estim; |
| _col = col; |
| } |
| |
| @Override |
| public CompressedSizeInfo call() { |
| return _estim.estimateCompressedColGroupSize(new int[] { _col }); |
| } |
| } |
| |
| private static class CompressTask implements Callable<ColGroup> |
| { |
| private final MatrixBlock _in; |
| private final CompressedSizeEstimator _estim; |
| private final HashMap<Integer, Double> _compRatios; |
| private final int _rlen; |
| private final int[] _colIndexes; |
| private final boolean _denseEst; |
| |
| protected CompressTask( MatrixBlock in, CompressedSizeEstimator estim, HashMap<Integer, Double> compRatios, int rlen, int[] colIndexes, boolean denseEst ) { |
| _in = in; |
| _estim = estim; |
| _compRatios = compRatios; |
| _rlen = rlen; |
| _colIndexes = colIndexes; |
| _denseEst = denseEst; |
| } |
| |
| @Override |
| public ColGroup call() { |
| return compressColGroup(_in, _estim, _compRatios, _rlen, _colIndexes, _denseEst); |
| } |
| } |
| |
| private static class DecompressTask implements Callable<Object> |
| { |
| private final List<ColGroup> _colGroups; |
| private final MatrixBlock _ret; |
| private final int _rl; |
| private final int _ru; |
| |
| protected DecompressTask( List<ColGroup> colGroups, MatrixBlock ret, int rl, int ru ) { |
| _colGroups = colGroups; |
| _ret = ret; |
| _rl = rl; |
| _ru = ru; |
| } |
| |
| @Override |
| public Object call() { |
| |
| //preallocate sparse rows to avoid repeated alloc |
| if( _ret.isInSparseFormat() ) { |
| int[] rnnz = new int[_ru-_rl]; |
| for (ColGroup grp : _colGroups) |
| grp.countNonZerosPerRow(rnnz, _rl, _ru); |
| SparseBlock rows = _ret.getSparseBlock(); |
| for( int i=_rl; i<_ru; i++ ) |
| rows.allocate(i, rnnz[i-_rl]); |
| } |
| |
| //decompress row partition |
| for (ColGroup grp : _colGroups) |
| grp.decompressToBlock(_ret, _rl, _ru); |
| |
| //post processing (sort due to append) |
| if( _ret.isInSparseFormat() ) |
| _ret.sortSparseRows(_rl, _ru); |
| |
| return null; |
| } |
| } |
| |
| ////////////////////////////////////////// |
| // Graceful fallback to uncompressed linear algebra |
| |
| @Override |
| public MatrixValue unaryOperations(UnaryOperator op, MatrixValue result) { |
| printDecompressWarning("unaryOperations"); |
| MatrixBlock tmp = isCompressed() ? decompress() : this; |
| return tmp.unaryOperations(op, result); |
| } |
| |
| @Override |
| public MatrixValue binaryOperations(BinaryOperator op, MatrixValue thatValue, MatrixValue result) { |
| printDecompressWarning("binaryOperations", (MatrixBlock)thatValue); |
| MatrixBlock left = isCompressed() ? decompress() : this; |
| MatrixBlock right = getUncompressed(thatValue); |
| return left.binaryOperations(op, right, result); |
| } |
| |
| @Override |
| public void binaryOperationsInPlace(BinaryOperator op, MatrixValue thatValue) { |
| printDecompressWarning("binaryOperationsInPlace", (MatrixBlock)thatValue); |
| MatrixBlock left = isCompressed() ? decompress() : this; |
| MatrixBlock right = getUncompressed(thatValue); |
| left.binaryOperationsInPlace(op, right); |
| } |
| |
| @Override |
| public void incrementalAggregate(AggregateOperator aggOp, MatrixValue correction, MatrixValue newWithCorrection, boolean deep) { |
| throw new DMLRuntimeException("CompressedMatrixBlock: incrementalAggregate not supported."); |
| } |
| |
| @Override |
| public void incrementalAggregate(AggregateOperator aggOp, MatrixValue newWithCorrection) { |
| throw new DMLRuntimeException("CompressedMatrixBlock: incrementalAggregate not supported."); |
| } |
| |
| @Override |
| public MatrixValue reorgOperations(ReorgOperator op, MatrixValue ret, int startRow, int startColumn, int length) { |
| printDecompressWarning("reorgOperations"); |
| MatrixBlock tmp = isCompressed() ? decompress() : this; |
| return tmp.reorgOperations(op, ret, startRow, startColumn, length); |
| } |
| |
| @Override |
| public MatrixBlock append(MatrixBlock that, MatrixBlock ret, boolean cbind) { |
| if( cbind ) //use supported operation |
| return append(that, ret); |
| printDecompressWarning("append-rbind", that); |
| MatrixBlock left = isCompressed() ? decompress() : this; |
| MatrixBlock right = getUncompressed(that); |
| return left.append(right, ret, cbind); |
| } |
| |
| @Override |
| public void append(MatrixValue v2, |
| ArrayList<IndexedMatrixValue> outlist, int blockRowFactor, |
| int blockColFactor, boolean cbind, boolean m2IsLast, int nextNCol) { |
| printDecompressWarning("append", (MatrixBlock)v2); |
| MatrixBlock left = isCompressed() ? decompress() : this; |
| MatrixBlock right = getUncompressed(v2); |
| left.append(right, outlist, blockRowFactor, blockColFactor, cbind, m2IsLast, nextNCol); |
| } |
| |
| @Override |
| public void permutationMatrixMultOperations(MatrixValue m2Val, MatrixValue out1Val, MatrixValue out2Val) { |
| permutationMatrixMultOperations(m2Val, out1Val, out2Val, 1); |
| } |
| |
| @Override |
| public void permutationMatrixMultOperations(MatrixValue m2Val, MatrixValue out1Val, MatrixValue out2Val, int k) { |
| printDecompressWarning("permutationMatrixMultOperations", (MatrixBlock)m2Val); |
| MatrixBlock left = isCompressed() ? decompress() : this; |
| MatrixBlock right = getUncompressed(m2Val); |
| left.permutationMatrixMultOperations(right, out1Val, out2Val, k); |
| } |
| |
| @Override |
| public MatrixBlock leftIndexingOperations(MatrixBlock rhsMatrix, int rl, int ru, int cl, int cu, MatrixBlock ret, UpdateType update) { |
| printDecompressWarning("leftIndexingOperations"); |
| MatrixBlock left = isCompressed() ? decompress() : this; |
| MatrixBlock right = getUncompressed(rhsMatrix); |
| return left.leftIndexingOperations(right, rl, ru, cl, cu, ret, update); |
| } |
| |
| @Override |
| public MatrixBlock leftIndexingOperations(ScalarObject scalar, int rl, int cl, MatrixBlock ret, UpdateType update) { |
| printDecompressWarning("leftIndexingOperations"); |
| MatrixBlock tmp = isCompressed() ? decompress() : this; |
| return tmp.leftIndexingOperations(scalar, rl, cl, ret, update); |
| } |
| |
| @Override |
| public MatrixBlock slice(int rl, int ru, int cl, int cu, CacheBlock ret) { |
| printDecompressWarning("slice"); |
| MatrixBlock tmp = isCompressed() ? decompress() : this; |
| return tmp.slice(rl, ru, cl, cu, ret); |
| } |
| |
| @Override |
| public void slice(ArrayList<IndexedMatrixValue> outlist, IndexRange range, |
| int rowCut, int colCut, int normalBlockRowFactor, |
| int normalBlockColFactor, int boundaryRlen, int boundaryClen) { |
| printDecompressWarning("slice"); |
| try { |
| MatrixBlock tmp = isCompressed() ? decompress() : this; |
| tmp.slice(outlist, range, rowCut, colCut, normalBlockRowFactor, |
| normalBlockColFactor, boundaryRlen, boundaryClen); |
| } |
| catch(DMLRuntimeException ex) { |
| throw new RuntimeException(ex); |
| } |
| } |
| |
| @Override |
| public MatrixValue zeroOutOperations(MatrixValue result, IndexRange range, boolean complementary) { |
| printDecompressWarning("zeroOutOperations"); |
| MatrixBlock tmp = isCompressed() ? decompress() : this; |
| return tmp.zeroOutOperations(result, range, complementary); |
| } |
| |
| @Override |
| public CM_COV_Object cmOperations(CMOperator op) { |
| printDecompressWarning("cmOperations"); |
| if( !isCompressed() || isEmptyBlock() ) |
| return super.cmOperations(op); |
| ColGroup grp = _colGroups.get(0); |
| if( grp instanceof ColGroupUncompressed ) |
| return ((ColGroupUncompressed)grp).getData().cmOperations(op); |
| |
| ColGroupValue grpVal = (ColGroupValue)grp; |
| MatrixBlock vals = grpVal.getValuesAsBlock(); |
| MatrixBlock counts = ColGroupValue.getCountsAsBlock(grpVal.getCounts(true)); |
| return vals.cmOperations(op, counts); |
| } |
| |
| @Override |
| public CM_COV_Object cmOperations(CMOperator op, MatrixBlock weights) { |
| printDecompressWarning("cmOperations"); |
| MatrixBlock right = getUncompressed(weights); |
| if( !isCompressed() || isEmptyBlock() ) |
| return super.cmOperations(op, right); |
| ColGroup grp = _colGroups.get(0); |
| if( grp instanceof ColGroupUncompressed ) |
| return ((ColGroupUncompressed)grp).getData().cmOperations(op); |
| return decompress().cmOperations(op, right); |
| } |
| |
| @Override |
| public CM_COV_Object covOperations(COVOperator op, MatrixBlock that) { |
| printDecompressWarning("covOperations"); |
| MatrixBlock left = isCompressed() ? decompress() : this; |
| MatrixBlock right = getUncompressed(that); |
| return left.covOperations(op, right); |
| } |
| |
| @Override |
| public CM_COV_Object covOperations(COVOperator op, MatrixBlock that, MatrixBlock weights) { |
| printDecompressWarning("covOperations"); |
| MatrixBlock left = isCompressed() ? decompress() : this; |
| MatrixBlock right1 = getUncompressed(that); |
| MatrixBlock right2 = getUncompressed(weights); |
| return left.covOperations(op, right1, right2); |
| } |
| |
| @Override |
| public MatrixValue sortOperations(MatrixValue weights, MatrixValue result) { |
| printDecompressWarning("sortOperations"); |
| MatrixBlock right = getUncompressed(weights); |
| if( !isCompressed() ) |
| return super.sortOperations(right, result); |
| ColGroup grp = _colGroups.get(0); |
| if( grp instanceof ColGroupUncompressed ) |
| return ((ColGroupUncompressed)grp).getData().sortOperations(right, result); |
| |
| if( right == null ) { |
| ColGroupValue grpVal = (ColGroupValue)grp; |
| MatrixBlock vals = grpVal.getValuesAsBlock(); |
| int[] counts = grpVal.getCounts(true); |
| double[] data = (vals.getDenseBlock()!=null) ? vals.getDenseBlockValues() : null; |
| SortUtils.sortByValue(0, vals.getNumRows(), data, counts); |
| MatrixBlock counts2 = ColGroupValue.getCountsAsBlock(counts); |
| return vals.sortOperations(counts2, result); |
| } |
| else |
| return decompress().sortOperations(right, result); |
| } |
| |
| @Override |
| public MatrixBlock aggregateBinaryOperations(MatrixIndexes m1Index, |
| MatrixBlock m1Value, MatrixIndexes m2Index, MatrixBlock m2Value, |
| MatrixBlock result, AggregateBinaryOperator op) { |
| printDecompressWarning("aggregateBinaryOperations"); |
| MatrixBlock left = isCompressed() ? decompress() : this; |
| MatrixBlock right = getUncompressed(m2Value); |
| return left.aggregateBinaryOperations(m1Index, left, m2Index, right, result, op); |
| } |
| |
| @Override |
| public MatrixBlock aggregateTernaryOperations(MatrixBlock m1, MatrixBlock m2, MatrixBlock m3, MatrixBlock ret, AggregateTernaryOperator op, boolean inCP) { |
| printDecompressWarning("aggregateTernaryOperations"); |
| MatrixBlock left = isCompressed() ? decompress() : this; |
| MatrixBlock right1 = getUncompressed(m2); |
| MatrixBlock right2 = getUncompressed(m3); |
| return left.aggregateTernaryOperations(left, right1, right2, ret, op, inCP); |
| } |
| |
| @Override |
| public MatrixBlock uaggouterchainOperations(MatrixBlock mbLeft, MatrixBlock mbRight, |
| MatrixBlock mbOut, BinaryOperator bOp, AggregateUnaryOperator uaggOp) { |
| printDecompressWarning("uaggouterchainOperations"); |
| MatrixBlock left = isCompressed() ? decompress() : this; |
| MatrixBlock right = getUncompressed(mbRight); |
| return left.uaggouterchainOperations(left, right, mbOut, bOp, uaggOp); |
| } |
| |
| @Override |
| public MatrixBlock groupedAggOperations(MatrixValue tgt, MatrixValue wghts, MatrixValue ret, int ngroups, Operator op) { |
| return groupedAggOperations(tgt, wghts, ret, ngroups, op, 1); |
| } |
| |
| @Override |
| public MatrixBlock groupedAggOperations(MatrixValue tgt, MatrixValue wghts, |
| MatrixValue ret, int ngroups, Operator op, int k) { |
| printDecompressWarning("groupedAggOperations"); |
| MatrixBlock left = isCompressed() ? decompress() : this; |
| MatrixBlock right = getUncompressed(wghts); |
| return left.groupedAggOperations(left, right, ret, ngroups, op, k); |
| } |
| |
| @Override |
| public MatrixBlock removeEmptyOperations(MatrixBlock ret, boolean rows, boolean emptyReturn, MatrixBlock select) { |
| printDecompressWarning("removeEmptyOperations"); |
| MatrixBlock tmp = isCompressed() ? decompress() : this; |
| return tmp.removeEmptyOperations(ret, rows, emptyReturn, select); |
| } |
| |
| @Override |
| public MatrixBlock removeEmptyOperations(MatrixBlock ret, boolean rows, boolean emptyReturn) { |
| printDecompressWarning("removeEmptyOperations"); |
| MatrixBlock tmp = isCompressed() ? decompress() : this; |
| return tmp.removeEmptyOperations(ret, rows, emptyReturn); |
| } |
| |
| @Override |
| public MatrixBlock rexpandOperations(MatrixBlock ret, double max, |
| boolean rows, boolean cast, boolean ignore, int k) { |
| printDecompressWarning("rexpandOperations"); |
| MatrixBlock tmp = isCompressed() ? decompress() : this; |
| return tmp.rexpandOperations(ret, max, rows, cast, ignore, k); |
| } |
| |
| @Override |
| public MatrixValue replaceOperations(MatrixValue result, double pattern, double replacement) { |
| printDecompressWarning("replaceOperations"); |
| MatrixBlock tmp = isCompressed() ? decompress() : this; |
| return tmp.replaceOperations(result, pattern, replacement); |
| } |
| |
| @Override |
| public void ctableOperations(Operator op, double scalar, |
| MatrixValue that, CTableMap resultMap, MatrixBlock resultBlock) { |
| printDecompressWarning("ctableOperations"); |
| MatrixBlock left = isCompressed() ? decompress() : this; |
| MatrixBlock right = getUncompressed(that); |
| left.ctableOperations(op, scalar, right, resultMap, resultBlock); |
| } |
| |
| @Override |
| public void ctableOperations(Operator op, double scalar, |
| double scalar2, CTableMap resultMap, MatrixBlock resultBlock) { |
| printDecompressWarning("ctableOperations"); |
| MatrixBlock tmp = isCompressed() ? decompress() : this; |
| tmp.ctableOperations(op, scalar, scalar2, resultMap, resultBlock); |
| } |
| |
| @Override |
| public void ctableOperations(Operator op, MatrixIndexes ix1, |
| double scalar, boolean left, int brlen, CTableMap resultMap, |
| MatrixBlock resultBlock) { |
| printDecompressWarning("ctableOperations"); |
| MatrixBlock tmp = isCompressed() ? decompress() : this; |
| tmp.ctableOperations(op, ix1, scalar, left, brlen, resultMap, resultBlock); |
| } |
| |
| @Override |
| public void ctableOperations(Operator op, MatrixValue that, |
| double scalar, boolean ignoreZeros, CTableMap resultMap, |
| MatrixBlock resultBlock) { |
| printDecompressWarning("ctableOperations"); |
| MatrixBlock left = isCompressed() ? decompress() : this; |
| MatrixBlock right = getUncompressed(that); |
| left.ctableOperations(op, right, scalar, ignoreZeros, resultMap, resultBlock); |
| } |
| |
| @Override |
| public MatrixBlock ctableSeqOperations(MatrixValue that, double scalar, MatrixBlock resultBlock) { |
| printDecompressWarning("ctableOperations"); |
| MatrixBlock right = getUncompressed(that); |
| return this.ctableSeqOperations(right, scalar, resultBlock); |
| } |
| |
| @Override |
| public void ctableOperations(Operator op, MatrixValue that, |
| MatrixValue that2, CTableMap resultMap) { |
| printDecompressWarning("ctableOperations"); |
| MatrixBlock left = isCompressed() ? decompress() : this; |
| MatrixBlock right1 = getUncompressed(that); |
| MatrixBlock right2 = getUncompressed(that2); |
| left.ctableOperations(op, right1, right2, resultMap); |
| } |
| |
| @Override |
| public void ctableOperations(Operator op, MatrixValue that, |
| MatrixValue that2, CTableMap resultMap, MatrixBlock resultBlock) { |
| printDecompressWarning("ctableOperations"); |
| MatrixBlock left = isCompressed() ? decompress() : this; |
| MatrixBlock right1 = getUncompressed(that); |
| MatrixBlock right2 = getUncompressed(that2); |
| left.ctableOperations(op, right1, right2, resultMap, resultBlock); |
| } |
| |
| @Override |
| public MatrixBlock ternaryOperations(TernaryOperator op, MatrixBlock m2, MatrixBlock m3, MatrixBlock ret) { |
| printDecompressWarning("ternaryOperations"); |
| MatrixBlock left = isCompressed() ? decompress() : this; |
| MatrixBlock right1 = getUncompressed(m2); |
| MatrixBlock right2 = getUncompressed(m3); |
| return left.ternaryOperations(op, right1, right2, ret); |
| } |
| |
| @Override |
| public MatrixBlock quaternaryOperations(QuaternaryOperator qop, |
| MatrixBlock um, MatrixBlock vm, MatrixBlock wm, MatrixBlock out) { |
| return quaternaryOperations(qop, um, vm, wm, out, 1); |
| } |
| |
| @Override |
| public MatrixBlock quaternaryOperations(QuaternaryOperator qop, MatrixBlock um, |
| MatrixBlock vm, MatrixBlock wm, MatrixBlock out, int k) { |
| printDecompressWarning("quaternaryOperations"); |
| MatrixBlock left = isCompressed() ? decompress() : this; |
| MatrixBlock right1 = getUncompressed(um); |
| MatrixBlock right2 = getUncompressed(vm); |
| MatrixBlock right3 = getUncompressed(wm); |
| return left.quaternaryOperations(qop, right1, right2, right3, out, k); |
| } |
| |
| @Override |
| public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen, Well1024a bigrand, long bSeed) { |
| throw new DMLRuntimeException("CompressedMatrixBlock: randOperationsInPlace not supported."); |
| } |
| |
| @Override |
| public MatrixBlock randOperationsInPlace(RandomMatrixGenerator rgen, Well1024a bigrand, long bSeed, int k) { |
| throw new DMLRuntimeException("CompressedMatrixBlock: randOperationsInPlace not supported."); |
| } |
| |
| @Override |
| public MatrixBlock seqOperationsInPlace(double from, double to, double incr) { |
| //output should always be uncompressed |
| throw new DMLRuntimeException("CompressedMatrixBlock: seqOperationsInPlace not supported."); |
| } |
| |
| private static boolean isCompressed(MatrixBlock mb) { |
| return (mb instanceof CompressedMatrixBlock && ((CompressedMatrixBlock)mb).isCompressed()); |
| } |
| |
| private static MatrixBlock getUncompressed(MatrixValue mVal) { |
| return isCompressed((MatrixBlock)mVal) ? |
| ((CompressedMatrixBlock)mVal).decompress() : |
| (MatrixBlock)mVal; |
| } |
| |
| private void printDecompressWarning(String operation) { |
| if( isCompressed() ) { |
| LOG.warn("Operation '"+operation+"' not supported yet - decompressing for ULA operations."); |
| } |
| } |
| |
| private void printDecompressWarning(String operation, MatrixBlock m2) { |
| if( isCompressed() || isCompressed(m2) ) { |
| LOG.warn("Operation '"+operation+"' not supported yet - decompressing for ULA operations."); |
| } |
| } |
| |
| private static HashSet<Integer> seq(int from, int to, int incr) { |
| HashSet<Integer> ret = new HashSet<>(); |
| for (int i = from; i <= to; i+=incr) |
| ret.add(i); |
| return ret; |
| } |
| |
| private class ColumnGroupIterator implements Iterator<IJV> |
| { |
| //iterator configuration |
| private final int _rl; |
| private final int _ru; |
| private final int _cgu; |
| private final boolean _inclZeros; |
| |
| //iterator state |
| private int _posColGroup = -1; |
| private Iterator<IJV> _iterColGroup = null; |
| private boolean _noNext = false; |
| |
| public ColumnGroupIterator(int rl, int ru, int cgl, int cgu, boolean inclZeros) { |
| _rl = rl; |
| _ru = ru; |
| _cgu = cgu; |
| _inclZeros = inclZeros; |
| _posColGroup = cgl-1; |
| getNextIterator(); |
| } |
| |
| @Override |
| public boolean hasNext() { |
| return !_noNext; |
| } |
| |
| @Override |
| public IJV next() { |
| if( _noNext ) |
| throw new RuntimeException("No more entries."); |
| IJV ret = _iterColGroup.next(); |
| if( !_iterColGroup.hasNext() ) |
| getNextIterator(); |
| return ret; |
| } |
| |
| private void getNextIterator() { |
| while( _posColGroup+1 < _cgu ) { |
| _posColGroup++; |
| _iterColGroup = _colGroups.get(_posColGroup) |
| .getIterator(_rl, _ru, _inclZeros, false); |
| if( _iterColGroup.hasNext() ) |
| return; |
| } |
| _noNext = true; |
| } |
| } |
| |
| private abstract class RowIterator<T> implements Iterator<T> |
| { |
| //iterator configuration |
| protected final int _rl; |
| protected final int _ru; |
| |
| //iterator state |
| protected ColGroupRowIterator[] _iters = null; |
| protected int _rpos; |
| |
| public RowIterator(int rl, int ru) { |
| _rl = rl; |
| _ru = ru; |
| |
| //initialize array of column group iterators |
| _iters = new ColGroupRowIterator[_colGroups.size()]; |
| for( int i=0; i<_colGroups.size(); i++ ) |
| _iters[i] = _colGroups.get(i).getRowIterator(_rl, _ru); |
| |
| //get initial row |
| _rpos = rl; |
| } |
| |
| @Override |
| public boolean hasNext() { |
| return (_rpos < _ru); |
| } |
| } |
| |
| private class DenseRowIterator extends RowIterator<double[]> |
| { |
| private final double[] _ret = new double[clen]; |
| |
| public DenseRowIterator(int rl, int ru) { |
| super(rl, ru); |
| } |
| |
| @Override |
| public double[] next() { |
| //prepare meta data common across column groups |
| final int blksz = BitmapEncoder.BITMAP_BLOCK_SZ; |
| final int ix = _rpos % blksz; |
| final boolean last = (_rpos+1 == _ru); |
| //copy group rows into consolidated row |
| Arrays.fill(_ret, 0); |
| for(int j=0; j<_iters.length; j++) |
| _iters[j].next(_ret, _rpos, ix, last); |
| //advance to next row and return buffer |
| _rpos++; |
| return _ret; |
| } |
| } |
| |
| private class SparseRowIterator extends RowIterator<SparseRow> |
| { |
| private final SparseRowVector _ret = new SparseRowVector(clen); |
| private final double[] _tmp = new double[clen]; |
| |
| public SparseRowIterator(int rl, int ru) { |
| super(rl, ru); |
| } |
| |
| @Override |
| public SparseRow next() { |
| //prepare meta data common across column groups |
| final int blksz = BitmapEncoder.BITMAP_BLOCK_SZ; |
| final int ix = _rpos % blksz; |
| final boolean last = (_rpos+1 == _ru); |
| //copy group rows into consolidated dense vector |
| //to avoid binary search+shifting or final sort |
| for(int j=0; j<_iters.length; j++) |
| _iters[j].next(_tmp, _rpos, ix, last); |
| //append non-zero values to consolidated sparse row |
| _ret.setSize(0); |
| for(int i=0; i<_tmp.length; i++) |
| _ret.append(i, _tmp[i]); |
| //advance to next row and return buffer |
| _rpos++; |
| return _ret; |
| } |
| } |
| } |