| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.sysds.runtime.matrix.data; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Comparator; |
| import java.util.List; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| |
| import org.apache.sysds.common.Types.CorrectionLocationType; |
| import org.apache.sysds.runtime.DMLRuntimeException; |
| import org.apache.sysds.runtime.codegen.SpoofOperator.SideInput; |
| import org.apache.sysds.runtime.codegen.SpoofOperator.SideInputSparseCell; |
| import org.apache.sysds.runtime.controlprogram.caching.MatrixObject.UpdateType; |
| import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer; |
| import org.apache.sysds.runtime.data.DenseBlock; |
| import org.apache.sysds.runtime.data.DenseBlockFactory; |
| import org.apache.sysds.runtime.data.SparseBlock; |
| import org.apache.sysds.runtime.data.SparseBlockCSR; |
| import org.apache.sysds.runtime.data.SparseBlockFactory; |
| import org.apache.sysds.runtime.functionobjects.Builtin; |
| import org.apache.sysds.runtime.functionobjects.Builtin.BuiltinCode; |
| import org.apache.sysds.runtime.functionobjects.CM; |
| import org.apache.sysds.runtime.functionobjects.IndexFunction; |
| import org.apache.sysds.runtime.functionobjects.KahanFunction; |
| import org.apache.sysds.runtime.functionobjects.KahanPlus; |
| import org.apache.sysds.runtime.functionobjects.KahanPlusSq; |
| import org.apache.sysds.runtime.functionobjects.Mean; |
| import org.apache.sysds.runtime.functionobjects.Multiply; |
| import org.apache.sysds.runtime.functionobjects.ReduceAll; |
| import org.apache.sysds.runtime.functionobjects.ReduceCol; |
| import org.apache.sysds.runtime.functionobjects.ReduceDiag; |
| import org.apache.sysds.runtime.functionobjects.ReduceRow; |
| import org.apache.sysds.runtime.functionobjects.ValueFunction; |
| import org.apache.sysds.runtime.instructions.InstructionUtils; |
| import org.apache.sysds.runtime.instructions.cp.CM_COV_Object; |
| import org.apache.sysds.runtime.instructions.cp.KahanObject; |
| import org.apache.sysds.runtime.matrix.operators.AggregateOperator; |
| import org.apache.sysds.runtime.matrix.operators.AggregateTernaryOperator; |
| import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator; |
| import org.apache.sysds.runtime.matrix.operators.CMOperator; |
| import org.apache.sysds.runtime.matrix.operators.CMOperator.AggregateOperationTypes; |
| import org.apache.sysds.runtime.matrix.operators.Operator; |
| import org.apache.sysds.runtime.matrix.operators.UnaryOperator; |
| import org.apache.sysds.runtime.util.CommonThreadPool; |
| import org.apache.sysds.runtime.util.DataConverter; |
| import org.apache.sysds.runtime.util.UtilFunctions; |
| |
| |
| /** |
| * MB: |
| * Library for matrix aggregations including ak+, uak+ for all |
| * combinations of dense and sparse representations, and corrections. |
| * Those are performance-critical operations because they are used |
| * on combiners/reducers of important operations like tsmm, mvmult, |
| * indexing, but also basic sum/min/max/mean, row*, col*, etc. Specific |
| * handling is especially required for all non sparse-safe operations |
| * in order to prevent unnecessary worse asymptotic behavior. |
| * |
| * This library currently covers the following opcodes: |
| * ak+, uak+, uark+, uack+, uasqk+, uarsqk+, uacsqk+, |
| * uamin, uarmin, uacmin, uamax, uarmax, uacmax, |
| * ua*, uamean, uarmean, uacmean, uavar, uarvar, uacvar, |
| * uarimax, uaktrace, cumk+, cummin, cummax, cum*, tak+. |
| * |
| * TODO next opcode extensions: a+, colindexmax |
| */ |
| public class LibMatrixAgg |
| { |
| // private static final Log LOG = LogFactory.getLog(LibMatrixAgg.class.getName()); |
| |
| //internal configuration parameters |
| private static final boolean NAN_AWARENESS = false; |
| private static final long PAR_NUMCELL_THRESHOLD1 = 1024*1024; //Min 1M elements |
| private static final long PAR_NUMCELL_THRESHOLD2 = 16*1024; //Min 16K elements |
| private static final long PAR_INTERMEDIATE_SIZE_THRESHOLD = 2*1024*1024; //Max 2MB |
| |
| //////////////////////////////// |
| // public matrix agg interface |
| //////////////////////////////// |
| |
| private enum AggType { |
| KAHAN_SUM, |
| KAHAN_SUM_SQ, |
| CUM_KAHAN_SUM, |
| CUM_MIN, |
| CUM_MAX, |
| CUM_PROD, |
| CUM_SUM_PROD, |
| MIN, |
| MAX, |
| MEAN, |
| VAR, |
| MAX_INDEX, |
| MIN_INDEX, |
| PROD, |
| INVALID, |
| } |
| |
| private LibMatrixAgg() { |
| //prevent instantiation via private constructor |
| } |
| |
| /** |
| * Core incremental matrix aggregate (ak+) as used in mapmult, tsmm, |
| * cpmm, etc. Note that we try to keep the current |
| * aggVal and aggCorr in dense format in order to allow efficient |
| * access according to the dense/sparse input. |
| * |
| * @param in input matrix |
| * @param aggVal current aggregate values (in/out) |
| * @param aggCorr current aggregate correction (in/out) |
| * @param deep deep copy flag |
| */ |
| public static void aggregateBinaryMatrix(MatrixBlock in, MatrixBlock aggVal, MatrixBlock aggCorr, boolean deep) { |
| //Timing time = new Timing(true); |
| //boolean saggVal = aggVal.sparse, saggCorr = aggCorr.sparse; |
| //long naggVal = aggVal.nonZeros, naggCorr = aggCorr.nonZeros; |
| |
| //common empty block handling |
| if( in.isEmptyBlock(false) ) { |
| return; |
| } |
| if( !deep && aggVal.isEmptyBlock(false) ) { |
| //shallow copy without correction allocation |
| aggVal.copyShallow(in); |
| return; |
| } |
| |
| //ensure MCSR instead of CSR for update in-place |
| if( aggVal.sparse && aggVal.isAllocated() && aggVal.getSparseBlock() instanceof SparseBlockCSR ) |
| aggVal.sparseBlock = SparseBlockFactory.copySparseBlock(SparseBlock.Type.MCSR, aggVal.getSparseBlock(), true); |
| if( aggCorr.sparse && aggCorr.isAllocated() && aggCorr.getSparseBlock() instanceof SparseBlockCSR ) |
| aggCorr.sparseBlock = SparseBlockFactory.copySparseBlock(SparseBlock.Type.MCSR, aggCorr.getSparseBlock(), true); |
| |
| //core aggregation |
| if(!in.sparse && !aggVal.sparse && !aggCorr.sparse) |
| aggregateBinaryMatrixAllDense(in, aggVal, aggCorr); |
| else if(in.sparse && !aggVal.sparse && !aggCorr.sparse) |
| aggregateBinaryMatrixSparseDense(in, aggVal, aggCorr); |
| else if(in.sparse ) //any aggVal, aggCorr |
| aggregateBinaryMatrixSparseGeneric(in, aggVal, aggCorr); |
| else //if( !in.sparse ) //any aggVal, aggCorr |
| aggregateBinaryMatrixDenseGeneric(in, aggVal, aggCorr); |
| |
| //System.out.println("agg ("+in.rlen+","+in.clen+","+in.nonZeros+","+in.sparse+"), ("+naggVal+","+saggVal+"), ("+naggCorr+","+saggCorr+") -> " + |
| // "("+aggVal.nonZeros+","+aggVal.sparse+"), ("+aggCorr.nonZeros+","+aggCorr.sparse+") in "+time.stop()+"ms."); |
| } |
| |
| /** |
| * Core incremental matrix aggregate (ak+) as used for uack+ and acrk+. |
| * Embedded correction values. |
| * |
| * @param in matrix block |
| * @param aggVal aggregate operator |
| * @param aop aggregate operator |
| */ |
| public static void aggregateBinaryMatrix(MatrixBlock in, MatrixBlock aggVal, AggregateOperator aop) { |
| //sanity check matching dimensions |
| if( in.getNumRows()!=aggVal.getNumRows() || in.getNumColumns()!=aggVal.getNumColumns() ) |
| throw new DMLRuntimeException("Dimension mismatch on aggregate: "+in.getNumRows()+"x"+in.getNumColumns()+ |
| " vs "+aggVal.getNumRows()+"x"+aggVal.getNumColumns()); |
| |
| //Timing time = new Timing(true); |
| |
| //core aggregation |
| boolean lastRowCorr = (aop.correction == CorrectionLocationType.LASTROW); |
| boolean lastColCorr = (aop.correction == CorrectionLocationType.LASTCOLUMN); |
| if( !in.sparse && lastRowCorr ) |
| aggregateBinaryMatrixLastRowDenseGeneric(in, aggVal); |
| else if( in.sparse && lastRowCorr ) |
| aggregateBinaryMatrixLastRowSparseGeneric(in, aggVal); |
| else if( !in.sparse && lastColCorr ) |
| aggregateBinaryMatrixLastColDenseGeneric(in, aggVal); |
| else //if( in.sparse && lastColCorr ) |
| aggregateBinaryMatrixLastColSparseGeneric(in, aggVal); |
| |
| //System.out.println("agg ("+in.rlen+","+in.clen+","+in.getNonZeros()+","+in.sparse+"), ("+naggVal+","+saggVal+") -> " + |
| // "("+aggVal.getNonZeros()+","+aggVal.isInSparseFormat()+") in "+time.stop()+"ms."); |
| } |
| |
| public static void aggregateUnaryMatrix(MatrixBlock in, MatrixBlock out, AggregateUnaryOperator uaop) { |
| |
| AggType aggtype = getAggType(uaop); |
| final int m = in.rlen; |
| final int m2 = out.rlen; |
| final int n2 = out.clen; |
| |
| //filter empty input blocks (incl special handling for sparse-unsafe operations) |
| if( in.isEmptyBlock(false) ){ |
| aggregateUnaryMatrixEmpty(in, out, aggtype, uaop.indexFn); |
| return; |
| } |
| |
| //Timing time = new Timing(true); |
| |
| //allocate output arrays (if required) |
| out.reset(m2, n2, false); //always dense |
| out.allocateDenseBlock(); |
| |
| if( !in.sparse ) |
| aggregateUnaryMatrixDense(in, out, aggtype, uaop.aggOp.increOp.fn, uaop.indexFn, 0, m); |
| else |
| aggregateUnaryMatrixSparse(in, out, aggtype, uaop.aggOp.increOp.fn, uaop.indexFn, 0, m); |
| |
| //cleanup output and change representation (if necessary) |
| out.recomputeNonZeros(); |
| out.examSparsity(); |
| } |
| |
| public static void aggregateUnaryMatrix(MatrixBlock in, MatrixBlock out, AggregateUnaryOperator uaop, int k) { |
| //fall back to sequential version if necessary |
| if( !satisfiesMultiThreadingConstraints(in, out, uaop, k) ) { |
| aggregateUnaryMatrix(in, out, uaop); |
| return; |
| } |
| |
| //prepare meta data |
| AggType aggtype = getAggType(uaop); |
| final int m = in.rlen; |
| final int m2 = out.rlen; |
| final int n2 = out.clen; |
| |
| //filter empty input blocks (incl special handling for sparse-unsafe operations) |
| if( in.isEmptyBlock(false) ){ |
| aggregateUnaryMatrixEmpty(in, out, aggtype, uaop.indexFn); |
| return; |
| } |
| |
| //Timing time = new Timing(true); |
| |
| //allocate output arrays (if required) |
| if( uaop.indexFn instanceof ReduceCol ) { |
| out.reset(m2, n2, false); //always dense |
| out.allocateDenseBlock(); |
| } |
| |
| //core multi-threaded unary aggregate computation |
| //(currently: always parallelization over number of rows) |
| try { |
| ExecutorService pool = CommonThreadPool.get(k); |
| ArrayList<AggTask> tasks = new ArrayList<>(); |
| ArrayList<Integer> blklens = UtilFunctions.getBalancedBlockSizesDefault(m, k, |
| (uaop.indexFn instanceof ReduceRow)); //use static partitioning for col*() |
| for( int i=0, lb=0; i<blklens.size(); lb+=blklens.get(i), i++ ) { |
| tasks.add( (uaop.indexFn instanceof ReduceCol) ? |
| new RowAggTask(in, out, aggtype, uaop, lb, lb+blklens.get(i)) : |
| new PartialAggTask(in, out, aggtype, uaop, lb, lb+blklens.get(i)) ); |
| } |
| pool.invokeAll(tasks); |
| pool.shutdown(); |
| //aggregate partial results |
| if( !(uaop.indexFn instanceof ReduceCol) ) { |
| out.copy(((PartialAggTask)tasks.get(0)).getResult()); //for init |
| for( int i=1; i<tasks.size(); i++ ) |
| aggregateFinalResult(uaop.aggOp, out, ((PartialAggTask)tasks.get(i)).getResult()); |
| } |
| } |
| catch(Exception ex) { |
| throw new DMLRuntimeException(ex); |
| } |
| |
| //cleanup output and change representation (if necessary) |
| out.recomputeNonZeros(); |
| out.examSparsity(); |
| |
| //System.out.println("uagg k="+k+" ("+in.rlen+","+in.clen+","+in.sparse+") in "+time.stop()+"ms."); |
| } |
| |
| public static MatrixBlock cumaggregateUnaryMatrix(MatrixBlock in, MatrixBlock out, UnaryOperator uop) { |
| return cumaggregateUnaryMatrix(in, out, uop, null); |
| } |
| |
| public static MatrixBlock cumaggregateUnaryMatrix(MatrixBlock in, MatrixBlock out, UnaryOperator uop, double[] agg) { |
| //prepare meta data |
| AggType aggtype = getAggType(uop); |
| final int m = in.rlen; |
| final int m2 = out.rlen; |
| final int n2 = out.clen; |
| |
| //filter empty input blocks (incl special handling for sparse-unsafe operations) |
| if( in.isEmpty() && (agg == null || aggtype == AggType.CUM_SUM_PROD) ) { |
| return aggregateUnaryMatrixEmpty(in, out, aggtype, null); |
| } |
| |
| //allocate output arrays (if required) |
| if( !uop.isInplace() || in.isInSparseFormat() || in.isEmpty() ) { |
| out.reset(m2, n2, false); //always dense |
| out.allocateDenseBlock(); |
| if( in.isEmpty() ) |
| in.allocateBlock(); |
| } |
| else { |
| out = in; |
| } |
| |
| //Timing time = new Timing(true); |
| |
| if( !in.sparse ) |
| cumaggregateUnaryMatrixDense(in, out, aggtype, uop.fn, agg, 0, m); |
| else |
| cumaggregateUnaryMatrixSparse(in, out, aggtype, uop.fn, agg, 0, m); |
| |
| //cleanup output and change representation (if necessary) |
| out.recomputeNonZeros(); |
| out.examSparsity(); |
| |
| //System.out.println("uop ("+in.rlen+","+in.clen+","+in.sparse+") in "+time.stop()+"ms."); |
| |
| return out; |
| } |
| |
| public static MatrixBlock cumaggregateUnaryMatrix(MatrixBlock in, MatrixBlock out, UnaryOperator uop, int k) { |
| AggregateUnaryOperator uaop = InstructionUtils.parseBasicCumulativeAggregateUnaryOperator(uop); |
| |
| //fall back to sequential if necessary or agg not supported |
| if( k <= 1 || (long)in.rlen*in.clen < PAR_NUMCELL_THRESHOLD1 || in.rlen <= k |
| || out.clen*8*k > PAR_INTERMEDIATE_SIZE_THRESHOLD || uaop == null || !out.isThreadSafe()) { |
| return cumaggregateUnaryMatrix(in, out, uop); |
| } |
| |
| //prepare meta data |
| AggType aggtype = getAggType(uop); |
| final int m = in.rlen; |
| final int m2 = out.rlen; |
| final int n2 = out.clen; |
| final int mk = aggtype==AggType.CUM_KAHAN_SUM?2:1; |
| |
| //filter empty input blocks (incl special handling for sparse-unsafe operations) |
| if( in.isEmpty() ){ |
| return aggregateUnaryMatrixEmpty(in, out, aggtype, null); |
| } |
| |
| //Timing time = new Timing(true); |
| |
| //allocate output arrays (if required) |
| if( !uop.isInplace() || in.isInSparseFormat() || in.isEmpty() ) { |
| out.reset(m2, n2, false); //always dense |
| out.allocateDenseBlock(); |
| } |
| else { |
| out = in; |
| } |
| |
| //core multi-threaded unary aggregate computation |
| //(currently: always parallelization over number of rows) |
| try { |
| ExecutorService pool = CommonThreadPool.get(k); |
| int blklen = (int)(Math.ceil((double)m/k)); |
| |
| //step 1: compute aggregates per row partition |
| AggType uaoptype = getAggType(uaop); |
| ArrayList<PartialAggTask> tasks = new ArrayList<>(); |
| for( int i=0; i<k & i*blklen<m; i++ ) |
| tasks.add( new PartialAggTask(in, new MatrixBlock(mk,n2,false), uaoptype, uaop, i*blklen, Math.min((i+1)*blklen, m)) ); |
| List<Future<Object>> taskret = pool.invokeAll(tasks); |
| for( Future<Object> task : taskret ) |
| task.get(); //check for errors |
| |
| //step 2: cumulative aggregate of partition aggregates |
| MatrixBlock tmp = new MatrixBlock(tasks.size(), n2, false); |
| for( int i=0; i<tasks.size(); i++ ) { |
| MatrixBlock row = tasks.get(i).getResult(); |
| if( uaop.aggOp.existsCorrection() ) |
| row.dropLastRowsOrColumns(uaop.aggOp.correction); |
| tmp.leftIndexingOperations(row, i, i, 0, n2-1, tmp, UpdateType.INPLACE_PINNED); |
| } |
| MatrixBlock tmp2 = cumaggregateUnaryMatrix(tmp, new MatrixBlock(tasks.size(), n2, false), uop); |
| |
| //step 3: compute final cumulative aggregate |
| ArrayList<CumAggTask> tasks2 = new ArrayList<>(); |
| for( int i=0; i<k & i*blklen<m; i++ ) { |
| double[] agg = (i==0)? null : |
| DataConverter.convertToDoubleVector(tmp2.slice(i-1, i-1, 0, n2-1, new MatrixBlock()), false); |
| tasks2.add( new CumAggTask(in, agg, out, aggtype, uop, i*blklen, Math.min((i+1)*blklen, m)) ); |
| } |
| List<Future<Long>> taskret2 = pool.invokeAll(tasks2); |
| pool.shutdown(); |
| |
| //step 4: aggregate nnz |
| out.nonZeros = 0; |
| for( Future<Long> task : taskret2 ) |
| out.nonZeros += task.get(); |
| } |
| catch(Exception ex) { |
| throw new DMLRuntimeException(ex); |
| } |
| |
| //cleanup output and change representation (if necessary) |
| out.examSparsity(); |
| |
| //System.out.println("uop k="+k+" ("+in.rlen+","+in.clen+","+in.sparse+") in "+time.stop()+"ms."); |
| |
| return out; |
| } |
| |
| public static MatrixBlock aggregateTernary(MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, MatrixBlock ret, AggregateTernaryOperator op) { |
| //early abort if any block is empty |
| if( in1.isEmptyBlock(false) || in2.isEmptyBlock(false) || in3!=null&&in3.isEmptyBlock(false) ) { |
| return ret; |
| } |
| |
| //Timing time = new Timing(true); |
| |
| //allocate output arrays (if required) |
| ret.reset(ret.rlen, ret.clen, false); //always dense |
| ret.allocateDenseBlock(); |
| |
| IndexFunction ixFn = op.indexFn; |
| if( !in1.sparse && !in2.sparse && (in3==null||!in3.sparse) ) //DENSE |
| aggregateTernaryDense(in1, in2, in3, ret, ixFn, 0, in1.rlen); |
| else //GENERAL CASE |
| aggregateTernaryGeneric(in1, in2, in3, ret, ixFn, 0, in1.rlen); |
| |
| //cleanup output and change representation (if necessary) |
| ret.recomputeNonZeros(); |
| ret.examSparsity(); |
| |
| //System.out.println("tak+ ("+in1.rlen+","+in1.sparse+","+in2.sparse+","+in3.sparse+") in "+time.stop()+"ms."); |
| |
| return ret; |
| } |
| |
| public static MatrixBlock aggregateTernary(MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, MatrixBlock ret, AggregateTernaryOperator op, int k) { |
| //fall back to sequential version if necessary |
| if( k <= 1 || in1.nonZeros+in2.nonZeros < PAR_NUMCELL_THRESHOLD1 || in1.rlen <= k/2 |
| || (!(op.indexFn instanceof ReduceCol) && ret.clen*8*k > PAR_INTERMEDIATE_SIZE_THRESHOLD) ) { |
| return aggregateTernary(in1, in2, in3, ret, op); |
| } |
| |
| //early abort if any block is empty |
| if( in1.isEmptyBlock(false) || in2.isEmptyBlock(false) || in3!=null&&in3.isEmptyBlock(false) ) { |
| return ret; |
| } |
| |
| //Timing time = new Timing(true); |
| |
| try { |
| ExecutorService pool = CommonThreadPool.get(k); |
| ArrayList<AggTernaryTask> tasks = new ArrayList<>(); |
| int blklen = (int)(Math.ceil((double)in1.rlen/k)); |
| IndexFunction ixFn = op.indexFn; |
| for( int i=0; i<k & i*blklen<in1.rlen; i++ ) |
| tasks.add( new AggTernaryTask(in1, in2, in3, ret, ixFn, i*blklen, Math.min((i+1)*blklen, in1.rlen))); |
| List<Future<MatrixBlock>> rtasks = pool.invokeAll(tasks); |
| pool.shutdown(); |
| //aggregate partial results and error handling |
| ret.copy(rtasks.get(0).get()); //for init |
| for( int i=1; i<rtasks.size(); i++ ) |
| aggregateFinalResult(op.aggOp, ret, rtasks.get(i).get()); |
| } |
| catch(Exception ex) { |
| throw new DMLRuntimeException(ex); |
| } |
| |
| //cleanup output and change representation (if necessary) |
| ret.recomputeNonZeros(); |
| ret.examSparsity(); |
| |
| //System.out.println("tak+ k="+k+" ("+in1.rlen+","+in1.sparse+","+in2.sparse+","+in3.sparse+") in "+time.stop()+"ms."); |
| |
| return ret; |
| } |
| |
| public static void groupedAggregate(MatrixBlock groups, MatrixBlock target, MatrixBlock weights, MatrixBlock result, int numGroups, Operator op) { |
| if( !(op instanceof CMOperator || op instanceof AggregateOperator) ) { |
| throw new DMLRuntimeException("Invalid operator (" + op + ") encountered while processing groupedAggregate."); |
| } |
| |
| //CM operator for count, mean, variance |
| //note: current support only for column vectors |
| if(op instanceof CMOperator) { |
| CMOperator cmOp = (CMOperator) op; |
| if( cmOp.getAggOpType()==AggregateOperationTypes.COUNT && weights==null && target.clen==1 ) { |
| //special case for vector counts |
| groupedAggregateVecCount(groups, result, numGroups); |
| } |
| else { //general case |
| groupedAggregateCM(groups, target, weights, result, numGroups, cmOp, 0, target.clen); |
| } |
| } |
| //Aggregate operator for sum (via kahan sum) |
| //note: support for row/column vectors and dense/sparse |
| else if( op instanceof AggregateOperator ) { |
| AggregateOperator aggop = (AggregateOperator) op; |
| groupedAggregateKahanPlus(groups, target, weights, result, numGroups, aggop, 0, target.clen); |
| } |
| |
| //postprocessing sparse/dense formats |
| //(nnz already maintained via append) |
| result.examSparsity(); |
| } |
| |
| public static void groupedAggregate(MatrixBlock groups, MatrixBlock target, MatrixBlock weights, MatrixBlock result, int numGroups, Operator op, int k) |
| { |
| //fall back to sequential version if necessary |
| boolean rowVector = (target.getNumRows()==1 && target.getNumColumns()>1); |
| if( k <= 1 || (long)target.rlen*target.clen < PAR_NUMCELL_THRESHOLD1 || rowVector || target.clen==1) { |
| groupedAggregate(groups, target, weights, result, numGroups, op); |
| return; |
| } |
| |
| if( !(op instanceof CMOperator || op instanceof AggregateOperator) ) { |
| throw new DMLRuntimeException("Invalid operator (" + op + ") encountered while processing groupedAggregate."); |
| } |
| |
| //preprocessing (no need to check isThreadSafe) |
| result.sparse = false; |
| result.allocateDenseBlock(); |
| |
| //core multi-threaded grouped aggregate computation |
| //(currently: parallelization over columns to avoid additional memory requirements) |
| try { |
| ExecutorService pool = CommonThreadPool.get(k); |
| ArrayList<GrpAggTask> tasks = new ArrayList<>(); |
| int blklen = (int)(Math.ceil((double)target.clen/k)); |
| for( int i=0; i<k & i*blklen<target.clen; i++ ) |
| tasks.add( new GrpAggTask(groups, target, weights, result, numGroups, op, i*blklen, Math.min((i+1)*blklen, target.clen)) ); |
| List<Future<Object>> taskret = pool.invokeAll(tasks); |
| pool.shutdown(); |
| for(Future<Object> task : taskret) |
| task.get(); //error handling |
| } |
| catch(Exception ex) { |
| throw new DMLRuntimeException(ex); |
| } |
| |
| //postprocessing |
| result.recomputeNonZeros(); |
| result.examSparsity(); |
| } |
| |
| public static boolean isSupportedUnaryAggregateOperator( AggregateUnaryOperator op ) { |
| AggType type = getAggType( op ); |
| return (type != AggType.INVALID); |
| } |
| |
| public static boolean isSupportedUnaryOperator( UnaryOperator op ) { |
| AggType type = getAggType( op ); |
| return (type != AggType.INVALID); |
| } |
| |
| public static boolean satisfiesMultiThreadingConstraints(MatrixBlock in, MatrixBlock out, AggregateUnaryOperator uaop, int k) { |
| boolean sharedTP = (InfrastructureAnalyzer.getLocalParallelism() == k); |
| return k > 1 && out.isThreadSafe() && in.rlen > (sharedTP ? k/8 : k/2) |
| && (uaop.indexFn instanceof ReduceCol || out.clen*8*k < PAR_INTERMEDIATE_SIZE_THRESHOLD) //size |
| && in.nonZeros > (sharedTP ? PAR_NUMCELL_THRESHOLD2 : PAR_NUMCELL_THRESHOLD1); |
| } |
| |
| /** |
| * Recompute outputs (e.g., maxindex or minindex) according to block indexes from MR. |
| * TODO: this should not be part of block operations but of the MR instruction. |
| * |
| * @param out matrix block |
| * @param op aggregate unary operator |
| * @param blen number of rows/cols in a block |
| * @param ix matrix indexes |
| */ |
| public static void recomputeIndexes( MatrixBlock out, AggregateUnaryOperator op, int blen, MatrixIndexes ix ) |
| { |
| AggType type = getAggType(op); |
| if( (type == AggType.MAX_INDEX || type == AggType.MIN_INDEX) && ix.getColumnIndex()!=1 ) //MAXINDEX or MININDEX |
| { |
| int m = out.rlen; |
| double[] c = out.getDenseBlockValues(); |
| for( int i=0, cix=0; i<m; i++, cix+=2 ) |
| c[cix] = UtilFunctions.computeCellIndex(ix.getColumnIndex(), blen, (int)c[cix]-1); |
| } |
| } |
| |
| private static AggType getAggType( AggregateUnaryOperator op ) |
| { |
| ValueFunction vfn = op.aggOp.increOp.fn; |
| IndexFunction ifn = op.indexFn; |
| |
| //(kahan) sum / sum squared / trace (for ReduceDiag) |
| if( vfn instanceof KahanFunction |
| && (op.aggOp.correction == CorrectionLocationType.LASTCOLUMN || op.aggOp.correction == CorrectionLocationType.LASTROW) |
| && (ifn instanceof ReduceAll || ifn instanceof ReduceCol || ifn instanceof ReduceRow || ifn instanceof ReduceDiag) ) |
| { |
| if (vfn instanceof KahanPlus) |
| return AggType.KAHAN_SUM; |
| else if (vfn instanceof KahanPlusSq) |
| return AggType.KAHAN_SUM_SQ; |
| } |
| |
| //mean |
| if( vfn instanceof Mean |
| && (op.aggOp.correction == CorrectionLocationType.LASTTWOCOLUMNS || op.aggOp.correction == CorrectionLocationType.LASTTWOROWS) |
| && (ifn instanceof ReduceAll || ifn instanceof ReduceCol || ifn instanceof ReduceRow) ) |
| { |
| return AggType.MEAN; |
| } |
| |
| //variance |
| if( vfn instanceof CM |
| && ((CM) vfn).getAggOpType() == AggregateOperationTypes.VARIANCE |
| && (op.aggOp.correction == CorrectionLocationType.LASTFOURCOLUMNS || |
| op.aggOp.correction == CorrectionLocationType.LASTFOURROWS) |
| && (ifn instanceof ReduceAll || ifn instanceof ReduceCol || ifn instanceof ReduceRow) ) |
| { |
| return AggType.VAR; |
| } |
| |
| //prod |
| if( vfn instanceof Multiply |
| && (ifn instanceof ReduceAll || ifn instanceof ReduceCol || ifn instanceof ReduceRow)) |
| { |
| return AggType.PROD; |
| } |
| |
| //min / max |
| if( vfn instanceof Builtin && |
| (ifn instanceof ReduceAll || ifn instanceof ReduceCol || ifn instanceof ReduceRow) ) |
| { |
| BuiltinCode bfcode = ((Builtin)vfn).bFunc; |
| switch( bfcode ){ |
| case MAX: return AggType.MAX; |
| case MIN: return AggType.MIN; |
| case MAXINDEX: return AggType.MAX_INDEX; |
| case MININDEX: return AggType.MIN_INDEX; |
| default: //do nothing |
| } |
| } |
| |
| return AggType.INVALID; |
| } |
| |
| private static AggType getAggType( UnaryOperator op ) { |
| ValueFunction vfn = op.fn; |
| if( vfn instanceof Builtin ) { |
| BuiltinCode bfunc = ((Builtin) vfn).bFunc; |
| switch( bfunc ) { |
| case CUMSUM: return AggType.CUM_KAHAN_SUM; |
| case CUMPROD: return AggType.CUM_PROD; |
| case CUMMIN: return AggType.CUM_MIN; |
| case CUMMAX: return AggType.CUM_MAX; |
| case CUMSUMPROD: return AggType.CUM_SUM_PROD; |
| default: return AggType.INVALID; |
| } |
| } |
| return AggType.INVALID; |
| } |
| |
| private static void aggregateFinalResult( AggregateOperator aop, MatrixBlock out, MatrixBlock partout ) { |
| AggregateOperator laop = aop; |
| |
| //special handling for mean where the final aggregate operator (kahan plus) |
| //is not equals to the partial aggregate operator |
| if( aop.increOp.fn instanceof Mean ) { |
| laop = new AggregateOperator(0, KahanPlus.getKahanPlusFnObject(), aop.correction); |
| } |
| |
| //incremental aggregation of final results |
| if( laop.existsCorrection() ) |
| out.incrementalAggregate(laop, partout); |
| else |
| out.binaryOperationsInPlace(laop.increOp, partout); |
| } |
| |
| private static void aggregateTernaryDense(MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, MatrixBlock ret, IndexFunction ixFn, int rl, int ru) |
| { |
| //compute block operations |
| KahanObject kbuff = new KahanObject(0, 0); |
| KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); |
| |
| double[] a = in1.getDenseBlockValues(); |
| double[] b1 = in2.getDenseBlockValues(); |
| double[] b2 = (in3!=null) ? in3.getDenseBlockValues() : null; //if null, literal 1 |
| final int n = in1.clen; |
| |
| if( ixFn instanceof ReduceAll ) //tak+* |
| { |
| for( int i=rl, ix=rl*n; i<ru; i++ ) |
| for( int j=0; j<n; j++, ix++ ) { |
| double b2val = (b2 != null) ? b2[ix] : 1; |
| double val = a[ix] * b1[ix] * b2val; |
| kplus.execute2( kbuff, val ); |
| } |
| ret.quickSetValue(0, 0, kbuff._sum); |
| ret.quickSetValue(0, 1, kbuff._correction); |
| } |
| else //tack+* |
| { |
| double[] c = ret.getDenseBlockValues(); |
| for( int i=rl, ix=rl*n; i<ru; i++ ) |
| for( int j=0; j<n; j++, ix++ ) { |
| double b2val = (b2 != null) ? b2[ix] : 1; |
| double val = a[ix] * b1[ix] * b2val; |
| kbuff._sum = c[j]; |
| kbuff._correction = c[j+n]; |
| kplus.execute2(kbuff, val); |
| c[j] = kbuff._sum; |
| c[j+n] = kbuff._correction; |
| } |
| } |
| } |
| |
| private static void aggregateTernaryGeneric(MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, MatrixBlock ret, IndexFunction ixFn, int rl, int ru) |
| { |
| //compute block operations |
| KahanObject kbuff = new KahanObject(0, 0); |
| KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); |
| |
| //guaranteed to have at least one sparse input, sort by nnz, assume num cells if |
| //(potentially incorrect) in dense representation, keep null at end via stable sort |
| MatrixBlock[] blocks = new MatrixBlock[]{in1, in2, in3}; |
| Arrays.sort(blocks, new Comparator<MatrixBlock>() { |
| @Override |
| public int compare(MatrixBlock o1, MatrixBlock o2) { |
| long nnz1 = (o1!=null && o1.sparse) ? o1.nonZeros : Long.MAX_VALUE; |
| long nnz2 = (o2!=null && o2.sparse) ? o2.nonZeros : Long.MAX_VALUE; |
| return Long.compare(nnz1, nnz2); |
| } |
| }); |
| MatrixBlock lin1 = blocks[0]; |
| MatrixBlock lin2 = blocks[1]; |
| MatrixBlock lin3 = blocks[2]; |
| |
| SparseBlock a = lin1.sparseBlock; |
| final int n = in1.clen; |
| |
| if( ixFn instanceof ReduceAll ) //tak+* |
| { |
| for( int i=rl; i<ru; i++ ) |
| if( !a.isEmpty(i) ) { |
| int apos = a.pos(i); |
| int alen = a.size(i); |
| int[] aix = a.indexes(i); |
| double[] avals = a.values(i); |
| for( int j=apos; j<apos+alen; j++ ) { |
| double val1 = avals[j]; |
| double val2 = lin2.quickGetValue(i, aix[j]); |
| double val = val1 * val2; |
| if( val != 0 && lin3 != null ) |
| val *= lin3.quickGetValue(i, aix[j]); |
| kplus.execute2( kbuff, val ); |
| } |
| } |
| ret.quickSetValue(0, 0, kbuff._sum); |
| ret.quickSetValue(0, 1, kbuff._correction); |
| } |
| else //tack+* |
| { |
| double[] c = ret.getDenseBlockValues(); |
| for( int i=rl; i<ru; i++ ) |
| if( !a.isEmpty(i) ) { |
| int apos = a.pos(i); |
| int alen = a.size(i); |
| int[] aix = a.indexes(i); |
| double[] avals = a.values(i); |
| for( int j=apos; j<apos+alen; j++ ) { |
| int colIx = aix[j]; |
| double val1 = avals[j]; |
| double val2 = lin2.quickGetValue(i, colIx); |
| double val = val1 * val2; |
| if( val != 0 && lin3 != null ) |
| val *= lin3.quickGetValue(i, colIx); |
| kbuff._sum = c[colIx]; |
| kbuff._correction = c[colIx+n]; |
| kplus.execute2( kbuff, val ); |
| c[colIx] = kbuff._sum; |
| c[colIx+n] = kbuff._correction; |
| } |
| } |
| } |
| } |
| |
| |
| /** |
| * This is a specific implementation for aggregate(fn="sum"), where we use KahanPlus for numerical |
| * stability. In contrast to other functions of aggregate, this implementation supports row and column |
| * vectors for target and exploits sparse representations since KahanPlus is sparse-safe. |
| * |
| * @param groups matrix block groups |
| * @param target matrix block target |
| * @param weights matrix block weights |
| * @param result matrix block result |
| * @param numGroups number of groups |
| * @param aggop aggregate operator |
| * @param cl column lower index |
| * @param cu column upper index |
| */ |
| private static void groupedAggregateKahanPlus( MatrixBlock groups, MatrixBlock target, MatrixBlock weights, MatrixBlock result, int numGroups, AggregateOperator aggop, int cl, int cu ) { |
| boolean rowVector = (target.getNumRows()==1 && target.getNumColumns()>1); |
| double w = 1; //default weight |
| |
| //skip empty blocks (sparse-safe operation) |
| if( target.isEmptyBlock(false) ) |
| return; |
| |
| //init group buffers |
| int numCols2 = cu-cl; |
| KahanObject[][] buffer = new KahanObject[numGroups][numCols2]; |
| for( int i=0; i<numGroups; i++ ) |
| for( int j=0; j<numCols2; j++ ) |
| buffer[i][j] = new KahanObject(aggop.initialValue, 0); |
| |
| if( rowVector ) //target is rowvector |
| { |
| //note: always sequential, no need to respect cl/cu |
| |
| if( target.sparse ) //SPARSE target |
| { |
| if( !target.sparseBlock.isEmpty(0) ) |
| { |
| int pos = target.sparseBlock.pos(0); |
| int len = target.sparseBlock.size(0); |
| int[] aix = target.sparseBlock.indexes(0); |
| double[] avals = target.sparseBlock.values(0); |
| for( int j=pos; j<pos+len; j++ ) //for each nnz |
| { |
| int g = (int) groups.quickGetValue(aix[j], 0); |
| if ( g > numGroups ) |
| continue; |
| if ( weights != null ) |
| w = weights.quickGetValue(aix[j],0); |
| aggop.increOp.fn.execute(buffer[g-1][0], avals[j]*w); |
| } |
| } |
| } |
| else //DENSE target |
| { |
| double[] a = target.getDenseBlockValues(); |
| for ( int i=0; i < target.getNumColumns(); i++ ) { |
| double d = a[ i ]; |
| if( d != 0 ) //sparse-safe |
| { |
| int g = (int) groups.quickGetValue(i, 0); |
| if ( g > numGroups ) |
| continue; |
| if ( weights != null ) |
| w = weights.quickGetValue(i,0); |
| // buffer is 0-indexed, whereas range of values for g = [1,numGroups] |
| aggop.increOp.fn.execute(buffer[g-1][0], d*w); |
| } |
| } |
| } |
| } |
| else //column vector or matrix |
| { |
| if( target.sparse ) //SPARSE target |
| { |
| SparseBlock a = target.sparseBlock; |
| |
| for( int i=0; i < groups.getNumRows(); i++ ) |
| { |
| int g = (int) groups.quickGetValue(i, 0); |
| if ( g > numGroups ) |
| continue; |
| |
| if( !a.isEmpty(i) ) |
| { |
| int pos = a.pos(i); |
| int len = a.size(i); |
| int[] aix = a.indexes(i); |
| double[] avals = a.values(i); |
| int j = (cl==0) ? 0 : a.posFIndexGTE(i,cl); |
| j = (j >= 0) ? pos+j : pos+len; |
| |
| for( ; j<pos+len && aix[j]<cu; j++ ) //for each nnz |
| { |
| if ( weights != null ) |
| w = weights.quickGetValue(aix[j],0); |
| aggop.increOp.fn.execute(buffer[g-1][aix[j]-cl], avals[j]*w); |
| } |
| } |
| } |
| } |
| else //DENSE target |
| { |
| DenseBlock a = target.getDenseBlock(); |
| for( int i=0; i < groups.getNumRows(); i++ ) { |
| int g = (int) groups.quickGetValue(i, 0); |
| if ( g > numGroups ) |
| continue; |
| double[] avals = a.values(i); |
| int aix = a.pos(i); |
| for( int j=cl; j < cu; j++ ) { |
| double d = avals[ aix+j ]; |
| if( d != 0 ) { //sparse-safe |
| if ( weights != null ) |
| w = weights.quickGetValue(i,0); |
| // buffer is 0-indexed, whereas range of values for g = [1,numGroups] |
| aggop.increOp.fn.execute(buffer[g-1][j-cl], d*w); |
| } |
| } |
| } |
| } |
| } |
| |
| // extract the results from group buffers |
| for( int i=0; i < numGroups; i++ ) |
| for( int j=0; j < numCols2; j++ ) |
| result.appendValue(i, j+cl, buffer[i][j]._sum); |
| } |
| |
| private static void groupedAggregateCM( MatrixBlock groups, MatrixBlock target, MatrixBlock weights, MatrixBlock result, int numGroups, CMOperator cmOp, int cl, int cu ) { |
| CM cmFn = CM.getCMFnObject(cmOp.getAggOpType()); |
| double w = 1; //default weight |
| |
| //init group buffers |
| int numCols2 = cu-cl; |
| CM_COV_Object[][] cmValues = new CM_COV_Object[numGroups][numCols2]; |
| for ( int i=0; i < numGroups; i++ ) |
| for( int j=0; j < numCols2; j++ ) |
| cmValues[i][j] = new CM_COV_Object(); |
| |
| //column vector or matrix |
| if( target.sparse ) { //SPARSE target |
| //note: we create a sparse side input for a linear scan (w/o binary search) |
| //over the sparse representation despite the sparse-unsafe operations |
| SparseBlock a = target.sparseBlock; |
| SideInputSparseCell sa = new SideInputSparseCell( |
| new SideInput(null, target, target.clen)); |
| |
| for( int i=0; i < groups.getNumRows(); i++ ) { |
| int g = (int) groups.quickGetValue(i, 0); |
| if( g > numGroups ) continue; |
| |
| //sparse unsafe correction empty row |
| if( a.isEmpty(i) ){ |
| w = (weights != null) ? weights.quickGetValue(i,0) : w; |
| for( int j=cl; j<cu; j++ ) |
| cmFn.execute(cmValues[g-1][j-cl], 0, w); |
| continue; |
| } |
| |
| //process non-empty row |
| for( int j=cl; j<cu; j++ ) { |
| double d = sa.getValue(i, j); |
| if ( weights != null ) |
| w = weights.quickGetValue(i,0); |
| cmFn.execute(cmValues[g-1][j-cl], d, w); |
| } |
| } |
| } |
| else { //DENSE target |
| DenseBlock a = target.getDenseBlock(); |
| for( int i=0; i < groups.getNumRows(); i++ ) { |
| int g = (int) groups.quickGetValue(i, 0); |
| if ( g > numGroups ) |
| continue; |
| double[] avals = a.values(i); |
| int aix = a.pos(i); |
| for( int j=cl; j<cu; j++ ) { |
| double d = avals[ aix+j ]; //sparse unsafe |
| if ( weights != null ) |
| w = weights.quickGetValue(i,0); |
| // buffer is 0-indexed, whereas range of values for g = [1,numGroups] |
| cmFn.execute(cmValues[g-1][j-cl], d, w); |
| } |
| } |
| } |
| |
| // extract the required value from each CM_COV_Object |
| for( int i=0; i < numGroups; i++ ) |
| for( int j=0; j < numCols2; j++ ) { |
| // result is 0-indexed, so is cmValues |
| result.appendValue(i, j, cmValues[i][j+cl].getRequiredResult(cmOp)); |
| } |
| } |
| |
| private static void groupedAggregateVecCount( MatrixBlock groups, MatrixBlock result, int numGroups ) { |
| //note: groups are always dense because 0 invalid |
| if( groups.isInSparseFormat() || groups.isEmptyBlock(false) ) |
| throw new DMLRuntimeException("Unsupported sparse input for aggregate-count on group vector."); |
| |
| double[] a = groups.getDenseBlockValues(); |
| int[] tmp = new int[numGroups]; |
| int m = groups.rlen; |
| |
| //compute counts |
| for( int i = 0; i < m; i++ ) { |
| int g = (int) a[i]; |
| if ( g > numGroups ) |
| continue; |
| tmp[g-1]++; |
| } |
| |
| //copy counts into result |
| for( int i=0; i<numGroups; i++ ) { |
| result.appendValue(i, 0, tmp[i]); |
| } |
| } |
| |
| private static void aggregateBinaryMatrixAllDense(MatrixBlock in, MatrixBlock aggVal, MatrixBlock aggCorr) { |
| //allocate output arrays (if required) |
| aggVal.allocateDenseBlock(); //should always stay in dense |
| aggCorr.allocateDenseBlock(); //should always stay in dense |
| |
| double[] a = in.getDenseBlockValues(); |
| double[] c = aggVal.getDenseBlockValues(); |
| double[] cc = aggCorr.getDenseBlockValues(); |
| |
| KahanObject buffer1 = new KahanObject(0, 0); |
| KahanPlus akplus = KahanPlus.getKahanPlusFnObject(); |
| final int len = Math.min(a.length, in.rlen*in.clen); |
| |
| int nnzC = 0; |
| int nnzCC = 0; |
| |
| for( int i=0; i<len; i++ ) |
| { |
| buffer1._sum = c[i]; |
| buffer1._correction = cc[i]; |
| akplus.execute2(buffer1, a[i]); |
| c[i] = buffer1._sum; |
| cc[i] = buffer1._correction; |
| nnzC += (buffer1._sum!=0)?1:0; |
| nnzCC += (buffer1._correction!=0)?1:0; |
| } |
| |
| aggVal.nonZeros = nnzC; |
| aggCorr.nonZeros = nnzCC; |
| } |
| |
| private static void aggregateBinaryMatrixSparseDense(MatrixBlock in, MatrixBlock aggVal, MatrixBlock aggCorr) { |
| //allocate output arrays (if required) |
| aggVal.allocateDenseBlock(); //should always stay in dense |
| aggCorr.allocateDenseBlock(); //should always stay in dense |
| |
| SparseBlock a = in.getSparseBlock(); |
| double[] c = aggVal.getDenseBlockValues(); |
| double[] cc = aggCorr.getDenseBlockValues(); |
| |
| KahanObject buffer1 = new KahanObject(0, 0); |
| KahanPlus akplus = KahanPlus.getKahanPlusFnObject(); |
| |
| final int m = in.rlen; |
| final int n = in.clen; |
| final int rlen = Math.min(a.numRows(), m); |
| |
| for( int i=0, cix=0; i<rlen; i++, cix+=n ) |
| { |
| if( !a.isEmpty(i) ) |
| { |
| int apos = a.pos(i); |
| int alen = a.size(i); |
| int[] aix = a.indexes(i); |
| double[] avals = a.values(i); |
| |
| for( int j=apos; j<apos+alen; j++ ) |
| { |
| int ix = cix+aix[j]; |
| buffer1._sum = c[ix]; |
| buffer1._correction = cc[ix]; |
| akplus.execute2(buffer1, avals[j]); |
| c[ix] = buffer1._sum; |
| cc[ix] = buffer1._correction; |
| } |
| } |
| } |
| |
| aggVal.recomputeNonZeros(); |
| aggCorr.recomputeNonZeros(); |
| } |
| |
| private static void aggregateBinaryMatrixSparseGeneric(MatrixBlock in, MatrixBlock aggVal, MatrixBlock aggCorr) { |
| SparseBlock a = in.getSparseBlock(); |
| |
| KahanObject buffer1 = new KahanObject(0, 0); |
| KahanPlus akplus = KahanPlus.getKahanPlusFnObject(); |
| |
| final int m = in.rlen; |
| final int rlen = Math.min(a.numRows(), m); |
| |
| for( int i=0; i<rlen; i++ ) |
| { |
| if( !a.isEmpty(i) ) |
| { |
| int apos = a.pos(i); |
| int alen = a.size(i); |
| int[] aix = a.indexes(i); |
| double[] avals = a.values(i); |
| |
| for( int j=apos; j<apos+alen; j++ ) |
| { |
| int jix = aix[j]; |
| buffer1._sum = aggVal.quickGetValue(i, jix); |
| buffer1._correction = aggCorr.quickGetValue(i, jix); |
| akplus.execute2(buffer1, avals[j]); |
| aggVal.quickSetValue(i, jix, buffer1._sum); |
| aggCorr.quickSetValue(i, jix, buffer1._correction); |
| } |
| } |
| } |
| |
| //note: nnz of aggVal/aggCorr maintained internally |
| if( aggVal.sparse ) |
| aggVal.examSparsity(false); |
| if( aggCorr.sparse ) |
| aggCorr.examSparsity(false); |
| } |
| |
| private static void aggregateBinaryMatrixDenseGeneric(MatrixBlock in, MatrixBlock aggVal, MatrixBlock aggCorr) { |
| final int m = in.rlen; |
| final int n = in.clen; |
| |
| double[] a = in.getDenseBlockValues(); |
| |
| KahanObject buffer = new KahanObject(0, 0); |
| KahanPlus akplus = KahanPlus.getKahanPlusFnObject(); |
| |
| //incl implicit nnz maintenance |
| for(int i=0, ix=0; i<m; i++) |
| for(int j=0; j<n; j++, ix++) |
| { |
| buffer._sum = aggVal.quickGetValue(i, j); |
| buffer._correction = aggCorr.quickGetValue(i, j); |
| akplus.execute(buffer, a[ix]); |
| aggVal.quickSetValue(i, j, buffer._sum); |
| aggCorr.quickSetValue(i, j, buffer._correction); |
| } |
| |
| //note: nnz of aggVal/aggCorr maintained internally |
| if( aggVal.sparse ) |
| aggVal.examSparsity(false); |
| if( aggCorr.sparse ) |
| aggCorr.examSparsity(false); |
| } |
| |
| private static void aggregateBinaryMatrixLastRowDenseGeneric(MatrixBlock in, MatrixBlock aggVal) { |
| if( in.denseBlock==null || in.isEmptyBlock(false) ) |
| return; |
| |
| final int m = in.rlen; |
| final int n = in.clen; |
| final int cix = (m-1)*n; |
| |
| double[] a = in.getDenseBlockValues(); |
| |
| KahanObject buffer = new KahanObject(0, 0); |
| KahanPlus akplus = KahanPlus.getKahanPlusFnObject(); |
| |
| //incl implicit nnz maintenance |
| for(int i=0, ix=0; i<m-1; i++) |
| for(int j=0; j<n; j++, ix++) |
| { |
| buffer._sum = aggVal.quickGetValue(i, j); |
| buffer._correction = aggVal.quickGetValue(m-1, j); |
| akplus.execute(buffer, a[ix], a[cix+j]); |
| aggVal.quickSetValue(i, j, buffer._sum); |
| aggVal.quickSetValue(m-1, j, buffer._correction); |
| } |
| |
| //note: nnz of aggVal maintained internally |
| aggVal.examSparsity(); |
| } |
| |
| private static void aggregateBinaryMatrixLastRowSparseGeneric(MatrixBlock in, MatrixBlock aggVal) { |
| //sparse-safe operation |
| if( in.isEmptyBlock(false) ) |
| return; |
| |
| SparseBlock a = in.getSparseBlock(); |
| |
| KahanObject buffer1 = new KahanObject(0, 0); |
| KahanPlus akplus = KahanPlus.getKahanPlusFnObject(); |
| |
| final int m = in.rlen; |
| final int rlen = Math.min(a.numRows(), m); |
| |
| for( int i=0; i<rlen-1; i++ ) |
| { |
| if( !a.isEmpty(i) ) |
| { |
| int apos = a.pos(i); |
| int alen = a.size(i); |
| int[] aix = a.indexes(i); |
| double[] avals = a.values(i); |
| |
| for( int j=apos; j<apos+alen; j++ ) |
| { |
| int jix = aix[j]; |
| double corr = in.quickGetValue(m-1, jix); |
| buffer1._sum = aggVal.quickGetValue(i, jix); |
| buffer1._correction = aggVal.quickGetValue(m-1, jix); |
| akplus.execute(buffer1, avals[j], corr); |
| aggVal.quickSetValue(i, jix, buffer1._sum); |
| aggVal.quickSetValue(m-1, jix, buffer1._correction); |
| } |
| } |
| } |
| |
| //note: nnz of aggVal/aggCorr maintained internally |
| aggVal.examSparsity(); |
| } |
| |
| private static void aggregateBinaryMatrixLastColDenseGeneric(MatrixBlock in, MatrixBlock aggVal) { |
| if( in.denseBlock==null || in.isEmptyBlock(false) ) |
| return; |
| |
| final int m = in.rlen; |
| final int n = in.clen; |
| |
| double[] a = in.getDenseBlockValues(); |
| |
| KahanObject buffer = new KahanObject(0, 0); |
| KahanPlus akplus = KahanPlus.getKahanPlusFnObject(); |
| |
| //incl implicit nnz maintenance |
| for(int i=0, ix=0; i<m; i++, ix+=n) |
| for(int j=0; j<n-1; j++) |
| { |
| buffer._sum = aggVal.quickGetValue(i, j); |
| buffer._correction = aggVal.quickGetValue(i, n-1); |
| akplus.execute(buffer, a[ix+j], a[ix+j+1]); |
| aggVal.quickSetValue(i, j, buffer._sum); |
| aggVal.quickSetValue(i, n-1, buffer._correction); |
| } |
| |
| //note: nnz of aggVal maintained internally |
| aggVal.examSparsity(); |
| } |
| |
| private static void aggregateBinaryMatrixLastColSparseGeneric(MatrixBlock in, MatrixBlock aggVal) { |
| //sparse-safe operation |
| if( in.isEmptyBlock(false) ) |
| return; |
| |
| SparseBlock a = in.getSparseBlock(); |
| |
| KahanObject buffer1 = new KahanObject(0, 0); |
| KahanPlus akplus = KahanPlus.getKahanPlusFnObject(); |
| |
| final int m = in.rlen; |
| final int n = in.clen; |
| final int rlen = Math.min(a.numRows(), m); |
| |
| for( int i=0; i<rlen; i++ ) |
| { |
| if( !a.isEmpty(i) ) |
| { |
| int apos = a.pos(i); |
| int alen = a.size(i); |
| int[] aix = a.indexes(i); |
| double[] avals = a.values(i); |
| |
| for( int j=apos; j<apos+alen && aix[j]<n-1; j++ ) |
| { |
| int jix = aix[j]; |
| double corr = in.quickGetValue(i, n-1); |
| buffer1._sum = aggVal.quickGetValue(i, jix); |
| buffer1._correction = aggVal.quickGetValue(i, n-1); |
| akplus.execute(buffer1, avals[j], corr); |
| aggVal.quickSetValue(i, jix, buffer1._sum); |
| aggVal.quickSetValue(i, n-1, buffer1._correction); |
| } |
| } |
| } |
| |
| //note: nnz of aggVal/aggCorr maintained internally |
| aggVal.examSparsity(); |
| } |
| |
| private static void aggregateUnaryMatrixDense(MatrixBlock in, MatrixBlock out, AggType optype, ValueFunction vFn, IndexFunction ixFn, int rl, int ru) { |
| final int n = in.clen; |
| |
| //note: due to corrections, even the output might be a large dense block |
| DenseBlock a = in.getDenseBlock(); |
| DenseBlock c = out.getDenseBlock(); |
| |
| switch( optype ) |
| { |
| case KAHAN_SUM: { //SUM/TRACE via k+, |
| KahanObject kbuff = new KahanObject(0, 0); |
| if( ixFn instanceof ReduceAll ) // SUM |
| d_uakp(a, c, n, kbuff, (KahanPlus)vFn, rl, ru); |
| else if( ixFn instanceof ReduceCol ) //ROWSUM |
| d_uarkp(a, c, n, kbuff, (KahanPlus)vFn, rl, ru); |
| else if( ixFn instanceof ReduceRow ) //COLSUM |
| d_uackp(a, c, n, kbuff, (KahanPlus)vFn, rl, ru); |
| else if( ixFn instanceof ReduceDiag ) //TRACE |
| d_uakptrace(a, c, n, kbuff, (KahanPlus)vFn, rl, ru); |
| break; |
| } |
| case KAHAN_SUM_SQ: { //SUM_SQ via k+, |
| KahanObject kbuff = new KahanObject(0, 0); |
| if( ixFn instanceof ReduceAll ) //SUM_SQ |
| d_uasqkp(a, c, n, kbuff, (KahanPlusSq)vFn, rl, ru); |
| else if( ixFn instanceof ReduceCol ) //ROWSUM_SQ |
| d_uarsqkp(a, c, n, kbuff, (KahanPlusSq)vFn, rl, ru); |
| else if( ixFn instanceof ReduceRow ) //COLSUM_SQ |
| d_uacsqkp(a, c, n, kbuff, (KahanPlusSq)vFn, rl, ru); |
| break; |
| } |
| case CUM_KAHAN_SUM: { //CUMSUM |
| KahanObject kbuff = new KahanObject(0, 0); |
| KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); |
| d_ucumkp(in.getDenseBlock(), null, out.getDenseBlock(), n, kbuff, kplus, rl, ru); |
| break; |
| } |
| case CUM_PROD: { //CUMPROD |
| d_ucumm(in.getDenseBlockValues(), null, out.getDenseBlockValues(), n, rl, ru); |
| break; |
| } |
| case CUM_MIN: |
| case CUM_MAX: { |
| double init = (optype==AggType.CUM_MAX) ? Double.NEGATIVE_INFINITY:Double.POSITIVE_INFINITY; |
| d_ucummxx(in.getDenseBlockValues(), null, out.getDenseBlockValues(), n, init, (Builtin)vFn, rl, ru); |
| break; |
| } |
| case MIN: |
| case MAX: { //MAX/MIN |
| double init = (optype==AggType.MAX) ? Double.NEGATIVE_INFINITY:Double.POSITIVE_INFINITY; |
| if( ixFn instanceof ReduceAll ) // MIN/MAX |
| d_uamxx(a, c, n, init, (Builtin)vFn, rl, ru); |
| else if( ixFn instanceof ReduceCol ) //ROWMIN/ROWMAX |
| d_uarmxx(a, c, n, init, (Builtin)vFn, rl, ru); |
| else if( ixFn instanceof ReduceRow ) //COLMIN/COLMAX |
| d_uacmxx(a, c, n, init, (Builtin)vFn, rl, ru); |
| break; |
| } |
| case MAX_INDEX: { |
| double init = Double.NEGATIVE_INFINITY; |
| if( ixFn instanceof ReduceCol ) //ROWINDEXMAX |
| d_uarimax(a, c, n, init, (Builtin)vFn, rl, ru); |
| break; |
| } |
| case MIN_INDEX: { |
| double init = Double.POSITIVE_INFINITY; |
| if( ixFn instanceof ReduceCol ) //ROWINDEXMIN |
| d_uarimin(a, c, n, init, (Builtin)vFn, rl, ru); |
| break; |
| } |
| case MEAN: { //MEAN |
| KahanObject kbuff = new KahanObject(0, 0); |
| if( ixFn instanceof ReduceAll ) // MEAN |
| d_uamean(a, c, n, kbuff, (Mean)vFn, rl, ru); |
| else if( ixFn instanceof ReduceCol ) //ROWMEAN |
| d_uarmean(a, c, n, kbuff, (Mean)vFn, rl, ru); |
| else if( ixFn instanceof ReduceRow ) //COLMEAN |
| d_uacmean(a, c, n, kbuff, (Mean)vFn, rl, ru); |
| break; |
| } |
| case VAR: { //VAR |
| CM_COV_Object cbuff = new CM_COV_Object(); |
| if( ixFn instanceof ReduceAll ) //VAR |
| d_uavar(a, c, n, cbuff, (CM)vFn, rl, ru); |
| else if( ixFn instanceof ReduceCol ) //ROWVAR |
| d_uarvar(a, c, n, cbuff, (CM)vFn, rl, ru); |
| else if( ixFn instanceof ReduceRow ) //COLVAR |
| d_uacvar(a, c, n, cbuff, (CM)vFn, rl, ru); |
| break; |
| } |
| case PROD: { //PROD |
| if( ixFn instanceof ReduceAll ) // PROD |
| d_uam(a, c, n, rl, ru ); |
| else if( ixFn instanceof ReduceCol ) //ROWPROD |
| d_uarm(a, c, n, rl, ru); |
| else if( ixFn instanceof ReduceRow ) //COLPROD |
| d_uacm(a, c, n, rl, ru); |
| break; |
| } |
| |
| default: |
| throw new DMLRuntimeException("Unsupported aggregation type: "+optype); |
| } |
| } |
| |
| private static void aggregateUnaryMatrixSparse(MatrixBlock in, MatrixBlock out, AggType optype, ValueFunction vFn, IndexFunction ixFn, int rl, int ru) { |
| final int m = in.rlen; |
| final int n = in.clen; |
| |
| //note: due to corrections, even the output might be a large dense block |
| SparseBlock a = in.getSparseBlock(); |
| DenseBlock c = out.getDenseBlock(); |
| |
| switch( optype ) |
| { |
| case KAHAN_SUM: { //SUM via k+ |
| KahanObject kbuff = new KahanObject(0, 0); |
| if( ixFn instanceof ReduceAll ) // SUM |
| s_uakp(a, c, n, kbuff, (KahanPlus)vFn, rl, ru); |
| else if( ixFn instanceof ReduceCol ) //ROWSUM |
| s_uarkp(a, c, n, kbuff, (KahanPlus)vFn, rl, ru); |
| else if( ixFn instanceof ReduceRow ) //COLSUM |
| s_uackp(a, c, n, kbuff, (KahanPlus)vFn, rl, ru); |
| else if( ixFn instanceof ReduceDiag ) //TRACE |
| s_uakptrace(a, c, n, kbuff, (KahanPlus)vFn, rl, ru); |
| break; |
| } |
| case KAHAN_SUM_SQ: { //SUM_SQ via k+ |
| KahanObject kbuff = new KahanObject(0, 0); |
| if( ixFn instanceof ReduceAll ) //SUM_SQ |
| s_uasqkp(a, c, n, kbuff, (KahanPlusSq)vFn, rl, ru); |
| else if( ixFn instanceof ReduceCol ) //ROWSUM_SQ |
| s_uarsqkp(a, c, n, kbuff, (KahanPlusSq)vFn, rl, ru); |
| else if( ixFn instanceof ReduceRow ) //COLSUM_SQ |
| s_uacsqkp(a, c, n, kbuff, (KahanPlusSq)vFn, rl, ru); |
| break; |
| } |
| case CUM_KAHAN_SUM: { //CUMSUM |
| KahanObject kbuff = new KahanObject(0, 0); |
| KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); |
| s_ucumkp(a, null, out.getDenseBlock(), m, n, kbuff, kplus, rl, ru); |
| break; |
| } |
| case CUM_PROD: { //CUMPROD |
| s_ucumm(a, null, out.getDenseBlockValues(), n, rl, ru); |
| break; |
| } |
| case CUM_MIN: |
| case CUM_MAX: { |
| double init = (optype==AggType.CUM_MAX) ? Double.NEGATIVE_INFINITY:Double.POSITIVE_INFINITY; |
| s_ucummxx(a, null, out.getDenseBlockValues(), n, init, (Builtin)vFn, rl, ru); |
| break; |
| } |
| case MIN: |
| case MAX: { //MAX/MIN |
| double init = (optype==AggType.MAX) ? Double.NEGATIVE_INFINITY:Double.POSITIVE_INFINITY; |
| if( ixFn instanceof ReduceAll ) // MIN/MAX |
| s_uamxx(a, c, n, init, (Builtin)vFn, rl, ru); |
| else if( ixFn instanceof ReduceCol ) //ROWMIN/ROWMAX |
| s_uarmxx(a, c, n, init, (Builtin)vFn, rl, ru); |
| else if( ixFn instanceof ReduceRow ) //COLMIN/COLMAX |
| s_uacmxx(a, c, m, n, init, (Builtin)vFn, rl, ru); |
| break; |
| } |
| case MAX_INDEX: { |
| double init = Double.NEGATIVE_INFINITY; |
| if( ixFn instanceof ReduceCol ) //ROWINDEXMAX |
| s_uarimax(a, c, n, init, (Builtin)vFn, rl, ru); |
| break; |
| } |
| case MIN_INDEX: { |
| double init = Double.POSITIVE_INFINITY; |
| if( ixFn instanceof ReduceCol ) //ROWINDEXMAX |
| s_uarimin(a, c, n, init, (Builtin)vFn, rl, ru); |
| break; |
| } |
| case MEAN: { |
| KahanObject kbuff = new KahanObject(0, 0); |
| if( ixFn instanceof ReduceAll ) // MEAN |
| s_uamean(a, c, n, kbuff, (Mean)vFn, rl, ru); |
| else if( ixFn instanceof ReduceCol ) //ROWMEAN |
| s_uarmean(a, c, n, kbuff, (Mean)vFn, rl, ru); |
| else if( ixFn instanceof ReduceRow ) //COLMEAN |
| s_uacmean(a, c, n, kbuff, (Mean)vFn, rl, ru); |
| break; |
| } |
| case VAR: { //VAR |
| CM_COV_Object cbuff = new CM_COV_Object(); |
| if( ixFn instanceof ReduceAll ) //VAR |
| s_uavar(a, c, n, cbuff, (CM)vFn, rl, ru); |
| else if( ixFn instanceof ReduceCol ) //ROWVAR |
| s_uarvar(a, c, n, cbuff, (CM)vFn, rl, ru); |
| else if( ixFn instanceof ReduceRow ) //COLVAR |
| s_uacvar(a, c, n, cbuff, (CM)vFn, rl, ru); |
| break; |
| } |
| case PROD: { //PROD |
| if( ixFn instanceof ReduceAll ) // PROD |
| s_uam(a, c, n, rl, ru ); |
| else if( ixFn instanceof ReduceCol ) // ROWPROD |
| s_uarm(a, c, n, rl, ru ); |
| else if( ixFn instanceof ReduceRow ) // COLPROD |
| s_uacm(a, c, n, rl, ru ); |
| break; |
| } |
| |
| default: |
| throw new DMLRuntimeException("Unsupported aggregation type: "+optype); |
| } |
| } |
| |
| private static void cumaggregateUnaryMatrixDense(MatrixBlock in, MatrixBlock out, AggType optype, ValueFunction vFn, double[] agg, int rl, int ru) { |
| final int n = in.clen; |
| |
| DenseBlock da = in.getDenseBlock(); |
| DenseBlock dc = out.getDenseBlock(); |
| double[] a = in.getDenseBlockValues(); |
| double[] c = out.getDenseBlockValues(); |
| |
| switch( optype ) { |
| case CUM_KAHAN_SUM: { //CUMSUM |
| KahanObject kbuff = new KahanObject(0, 0); |
| KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); |
| d_ucumkp(da, agg, dc, n, kbuff, kplus, rl, ru); |
| break; |
| } |
| case CUM_SUM_PROD: { //CUMSUMPROD |
| if( n != 2 ) |
| throw new DMLRuntimeException("Cumsumprod expects two-column input (n="+n+")."); |
| d_ucumkpp(da, agg, dc, rl, ru); |
| break; |
| } |
| case CUM_PROD: { //CUMPROD |
| d_ucumm(a, agg, c, n, rl, ru); |
| break; |
| } |
| case CUM_MIN: |
| case CUM_MAX: { |
| double init = (optype==AggType.CUM_MAX)? Double.NEGATIVE_INFINITY:Double.POSITIVE_INFINITY; |
| d_ucummxx(a, agg, c, n, init, (Builtin)vFn, rl, ru); |
| break; |
| } |
| default: |
| throw new DMLRuntimeException("Unsupported cumulative aggregation type: "+optype); |
| } |
| } |
| |
| private static void cumaggregateUnaryMatrixSparse(MatrixBlock in, MatrixBlock out, AggType optype, ValueFunction vFn, double[] agg, int rl, int ru) { |
| final int m = in.rlen; |
| final int n = in.clen; |
| |
| SparseBlock a = in.getSparseBlock(); |
| DenseBlock dc = out.getDenseBlock(); |
| double[] c = out.getDenseBlockValues(); |
| |
| switch( optype ) { |
| case CUM_KAHAN_SUM: { //CUMSUM |
| KahanObject kbuff = new KahanObject(0, 0); |
| KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); |
| s_ucumkp(a, agg, dc, m, n, kbuff, kplus, rl, ru); |
| break; |
| } |
| case CUM_PROD: { //CUMPROD |
| s_ucumm(a, agg, c, n, rl, ru); |
| break; |
| } |
| case CUM_MIN: |
| case CUM_MAX: { |
| double init = (optype==AggType.CUM_MAX) ? Double.NEGATIVE_INFINITY:Double.POSITIVE_INFINITY; |
| s_ucummxx(a, agg, c, n, init, (Builtin)vFn, rl, ru); |
| break; |
| } |
| default: |
| throw new DMLRuntimeException("Unsupported cumulative aggregation type: "+optype); |
| } |
| } |
| |
| private static MatrixBlock aggregateUnaryMatrixEmpty(MatrixBlock in, MatrixBlock out, AggType optype, IndexFunction ixFn) { |
| //handle all full aggregates over matrices with zero rows or columns |
| if( ixFn instanceof ReduceAll && (in.getNumRows() == 0 || in.getNumColumns() == 0) ) { |
| double val = Double.NaN; |
| switch( optype ) { |
| case KAHAN_SUM: |
| case KAHAN_SUM_SQ: val = 0; break; |
| case MIN: val = Double.POSITIVE_INFINITY; break; |
| case MAX: val = Double.NEGATIVE_INFINITY; break; |
| case MEAN: |
| case VAR: |
| case MIN_INDEX: |
| case MAX_INDEX: |
| default: val = Double.NaN; break; |
| } |
| out.quickSetValue(0, 0, val); |
| return out; |
| } |
| |
| //handle pseudo sparse-safe operations over empty inputs |
| if(optype==AggType.KAHAN_SUM || optype==AggType.KAHAN_SUM_SQ |
| || optype==AggType.MIN || optype==AggType.MAX || optype==AggType.PROD |
| || optype == AggType.CUM_KAHAN_SUM || optype == AggType.CUM_PROD |
| || optype == AggType.CUM_MIN || optype == AggType.CUM_MAX) |
| { |
| return out; |
| } |
| |
| //compute result based on meta data only |
| switch( optype ) |
| { |
| case MAX_INDEX: { |
| if( ixFn instanceof ReduceCol ) { //ROWINDEXMAX |
| for(int i=0; i<out.rlen; i++) { |
| out.quickSetValue(i, 0, in.clen); //maxindex |
| } |
| } |
| break; |
| } |
| case MIN_INDEX: { |
| if( ixFn instanceof ReduceCol ) //ROWINDEXMIN |
| for(int i=0; i<out.rlen; i++) { |
| out.quickSetValue(i, 0, in.clen); //minindex |
| } |
| break; |
| } |
| case MEAN: { |
| if( ixFn instanceof ReduceAll ) // MEAN |
| out.quickSetValue(0, 1, in.rlen*in.clen); //count |
| else if( ixFn instanceof ReduceCol ) //ROWMEAN |
| for( int i=0; i<in.rlen; i++ ) //0-sum and 0-correction |
| out.quickSetValue(i, 1, in.clen); //count |
| else if( ixFn instanceof ReduceRow ) //COLMEAN |
| for( int j=0; j<in.clen; j++ ) //0-sum and 0-correction |
| out.quickSetValue(1, j, in.rlen); //count |
| break; |
| } |
| case VAR: { |
| // results: { var | mean, count, m2 correction, mean correction } |
| if( ixFn instanceof ReduceAll ) //VAR |
| out.quickSetValue(0, 2, in.rlen*in.clen); //count |
| else if( ixFn instanceof ReduceCol ) //ROWVAR |
| for( int i=0; i<in.rlen; i++ ) |
| out.quickSetValue(i, 2, in.clen); //count |
| else if( ixFn instanceof ReduceRow ) //COLVAR |
| for( int j=0; j<in.clen; j++ ) |
| out.quickSetValue(2, j, in.rlen); //count |
| break; |
| } |
| |
| default: |
| throw new DMLRuntimeException("Unsupported aggregation type: "+optype); |
| } |
| |
| return out; |
| } |
| |
| |
| //////////////////////////////////////////// |
| // core aggregation functions // |
| //////////////////////////////////////////// |
| |
| /** |
| * SUM, opcode: uak+, dense input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param kbuff ? |
| * @param kplus ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void d_uakp( DenseBlock a, DenseBlock c, int n, KahanObject kbuff, KahanPlus kplus, int rl, int ru ) { |
| final int bil = a.index(rl); |
| final int biu = a.index(ru-1); |
| for(int bi=bil; bi<=biu; bi++) { |
| int lpos = (bi==bil) ? a.pos(rl) : 0; |
| int len = (bi==biu) ? a.pos(ru-1)-lpos+n : a.blockSize(bi)*n; |
| sum(a.valuesAt(bi), lpos, len, kbuff, kplus); |
| } |
| c.set(kbuff); |
| } |
| |
| /** |
| * ROWSUM, opcode: uark+, dense input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param kbuff ? |
| * @param kplus ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void d_uarkp( DenseBlock a, DenseBlock c, int n, KahanObject kbuff, KahanPlus kplus, int rl, int ru ) |
| { |
| for( int i=rl; i<ru; i++ ) { |
| kbuff.set(0, 0); //reset buffer |
| sum( a.values(i), a.pos(i), n, kbuff, kplus ); |
| c.set(i, kbuff); |
| } |
| } |
| |
| /** |
| * COLSUM, opcode: uack+, dense input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param kbuff ? |
| * @param kplus ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void d_uackp( DenseBlock a, DenseBlock c, int n, KahanObject kbuff, KahanPlus kplus, int rl, int ru ) { |
| for( int i=rl; i<ru; i++ ) |
| sumAgg( a.values(i), c, a.pos(i), n, kbuff, kplus ); |
| } |
| |
| /** |
| * SUM_SQ, opcode: uasqk+, dense input. |
| * |
| * @param a Array of values to square & sum. |
| * @param c Output array to store sum and correction factor. |
| * @param n Number of values per row. |
| * @param kbuff A KahanObject to hold the current sum and |
| * correction factor for the Kahan summation |
| * algorithm. |
| * @param kplusSq A KahanPlusSq object to perform summation of |
| * squared values. |
| * @param rl Lower row limit. |
| * @param ru Upper row limit. |
| */ |
| private static void d_uasqkp(DenseBlock a, DenseBlock c, int n, KahanObject kbuff, KahanPlusSq kplusSq, int rl, int ru) |
| { |
| final int bil = a.index(rl); |
| final int biu = a.index(ru-1); |
| for(int bi=bil; bi<=biu; bi++) { |
| int lpos = (bi==bil) ? a.pos(rl) : 0; |
| int len = (bi==biu) ? a.pos(ru-1)-lpos+n : a.blockSize(bi)*n; |
| sum(a.valuesAt(bi), lpos, len, kbuff, kplusSq); |
| } |
| c.set(kbuff); |
| } |
| |
| /** |
| * ROWSUM_SQ, opcode: uarsqk+, dense input. |
| * |
| * @param a Array of values to square & sum row-wise. |
| * @param c Output array to store sum and correction factor |
| * for each row. |
| * @param n Number of values per row. |
| * @param kbuff A KahanObject to hold the current sum and |
| * correction factor for the Kahan summation |
| * algorithm. |
| * @param kplusSq A KahanPlusSq object to perform summation of |
| * squared values. |
| * @param rl Lower row limit. |
| * @param ru Upper row limit. |
| */ |
| private static void d_uarsqkp(DenseBlock a, DenseBlock c, int n, KahanObject kbuff, KahanPlusSq kplusSq, int rl, int ru) { |
| for (int i=rl; i<ru; i++) { |
| kbuff.set(0, 0); //reset buffer |
| sum(a.values(i), a.pos(i), n, kbuff, kplusSq); |
| c.set(i, kbuff); |
| } |
| } |
| |
| /** |
| * COLSUM_SQ, opcode: uacsqk+, dense input. |
| * |
| * @param a Array of values to square & sum column-wise. |
| * @param c Output array to store sum and correction factor |
| * for each column. |
| * @param n Number of values per row. |
| * @param kbuff A KahanObject to hold the current sum and |
| * correction factor for the Kahan summation |
| * algorithm. |
| * @param kplusSq A KahanPlusSq object to perform summation of |
| * squared values. |
| * @param rl Lower row limit. |
| * @param ru Upper row limit. |
| */ |
| private static void d_uacsqkp(DenseBlock a, DenseBlock c, int n, KahanObject kbuff, KahanPlusSq kplusSq, int rl, int ru) { |
| for( int i=rl; i<ru; i++ ) |
| sumAgg(a.values(i), c, a.pos(i), n, kbuff, kplusSq); |
| } |
| |
| /** |
| * CUMSUM, opcode: ucumk+, dense input. |
| * |
| * @param a ? |
| * @param agg ? |
| * @param c ? |
| * @param n ? |
| * @param kbuff ? |
| * @param kplus ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void d_ucumkp( DenseBlock a, double[] agg, DenseBlock c, int n, KahanObject kbuff, KahanPlus kplus, int rl, int ru ) { |
| //init current row sum/correction arrays w/ neutral 0 |
| DenseBlock csums = DenseBlockFactory.createDenseBlock(2, n); |
| if( agg != null ) |
| csums.set(0, agg); |
| //scan once and compute prefix sums |
| for( int i=rl; i<ru; i++ ) { |
| sumAgg( a.values(i), csums, a.pos(i), n, kbuff, kplus ); |
| c.set(i, csums.values(0)); |
| } |
| } |
| |
| /** |
| * CUMSUMPROD, opcode: ucumk+*, dense input. |
| * |
| * @param a ? |
| * @param agg ? |
| * @param c ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void d_ucumkpp( DenseBlock a, double[] agg, DenseBlock c, int rl, int ru ) { |
| //init current row sum/correction arrays w/ neutral 0 |
| double sum = (agg != null) ? agg[0] : 0; |
| //scan once and compute prefix sums |
| double[] avals = a.valuesAt(0); |
| double[] cvals = c.values(0); |
| for( int i=rl, ix=rl*2; i<ru; i++, ix+=2 ) { |
| sum = cvals[i] = avals[ix] + avals[ix+1] * sum; |
| } |
| } |
| |
| /** |
| * CUMPROD, opcode: ucum*, dense input. |
| * |
| * @param a ? |
| * @param agg ? |
| * @param c ? |
| * @param n ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void d_ucumm( double[] a, double[] agg, double[] c, int n, int rl, int ru ) |
| { |
| //init current row product array w/ neutral 1 |
| double[] cprods = (agg!=null) ? agg : new double[ n ]; |
| if( agg == null ) |
| Arrays.fill(cprods, 1); |
| |
| //scan once and compute prefix products |
| for( int i=rl, aix=rl*n; i<ru; i++, aix+=n ) { |
| productAgg( a, cprods, aix, 0, n ); |
| System.arraycopy(cprods, 0, c, aix, n); |
| } |
| } |
| |
| /** |
| * CUMMIN/CUMMAX, opcode: ucummin/ucummax, dense input. |
| * |
| * @param a ? |
| * @param agg ? |
| * @param c ? |
| * @param n ? |
| * @param init ? |
| * @param builtin ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void d_ucummxx( double[] a, double[] agg, double[] c, int n, double init, Builtin builtin, int rl, int ru ) |
| { |
| //init current row min/max array w/ extreme value |
| double[] cmxx = (agg!=null) ? agg : new double[ n ]; |
| if( agg == null ) |
| Arrays.fill(cmxx, init); |
| |
| //scan once and compute prefix min/max |
| for( int i=rl, aix=rl*n; i<ru; i++, aix+=n ) { |
| builtinAgg( a, cmxx, aix, n, builtin ); |
| System.arraycopy(cmxx, 0, c, aix, n); |
| } |
| } |
| /** |
| * TRACE, opcode: uaktrace |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param kbuff ? |
| * @param kplus ? |
| * @param rl ? |
| * @param ru ? |
| */ |
| private static void d_uakptrace( DenseBlock a, DenseBlock c, int n, KahanObject kbuff, KahanPlus kplus, int rl, int ru ) |
| { |
| //aggregate diag (via ix=n+1) |
| for( int i=rl; i<ru; i++ ) |
| kplus.execute2(kbuff, a.get(i, i)); |
| c.set(kbuff); |
| } |
| |
| /** |
| * MIN/MAX, opcode: uamin/uamax, dense input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param init ? |
| * @param builtin ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void d_uamxx( DenseBlock a, DenseBlock c, int n, double init, Builtin builtin, int rl, int ru ) { |
| double tmp = init; |
| final int bil = a.index(rl); |
| final int biu = a.index(ru-1); |
| for(int bi=bil; bi<=biu; bi++) { |
| int lpos = (bi==bil) ? a.pos(rl) : 0; |
| int len = (bi==biu) ? a.pos(ru-1)-lpos+n : a.blockSize(bi)*n; |
| tmp = builtin(a.valuesAt(bi), lpos, tmp, len, builtin); |
| } |
| c.set(0, 0, tmp); |
| } |
| |
| /** |
| * ROWMIN/ROWMAX, opcode: uarmin/uarmax, dense input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param init ? |
| * @param builtin ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void d_uarmxx( DenseBlock a, DenseBlock c, int n, double init, Builtin builtin, int rl, int ru ) { |
| for( int i=rl; i<ru; i++ ) |
| c.set(i, 0, builtin(a.values(i), a.pos(i), init, n, builtin)); |
| } |
| |
| /** |
| * COLMIN/COLMAX, opcode: uacmin/uacmax, dense input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param init ? |
| * @param builtin ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void d_uacmxx( DenseBlock a, DenseBlock c, int n, double init, Builtin builtin, int rl, int ru ) { |
| //init output (base for incremental agg) |
| c.set(init); |
| //execute builtin aggregate |
| double[] lc = c.values(0); //guaranteed single row |
| for( int i=rl; i<ru; i++ ) |
| builtinAgg( a.values(i), lc, a.pos(i), n, builtin ); |
| } |
| |
| /** |
| * ROWINDEXMAX, opcode: uarimax, dense input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param init ? |
| * @param builtin ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void d_uarimax( DenseBlock a, DenseBlock c, int n, double init, Builtin builtin, int rl, int ru ) { |
| if( n <= 0 ) |
| throw new DMLRuntimeException("rowIndexMax undefined for ncol="+n); |
| for( int i=rl; i<ru; i++ ) { |
| int maxindex = indexmax(a.values(i), a.pos(i), init, n, builtin); |
| c.set(i, 0, (double)maxindex + 1); |
| c.set(i, 1, a.get(i, maxindex)); //max value |
| } |
| } |
| |
| /** |
| * ROWINDEXMIN, opcode: uarimin, dense input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param init ? |
| * @param builtin ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void d_uarimin( DenseBlock a, DenseBlock c, int n, double init, Builtin builtin, int rl, int ru ) { |
| if( n <= 0 ) |
| throw new DMLRuntimeException("rowIndexMin undefined for ncol="+n); |
| for( int i=rl; i<ru; i++ ) { |
| int minindex = indexmin(a.values(i), a.pos(i), init, n, builtin); |
| c.set(i, 0, (double)minindex + 1); |
| c.set(i, 1, a.get(i, minindex)); //min value |
| } |
| } |
| |
| /** |
| * MEAN, opcode: uamean, dense input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param kbuff ? |
| * @param kmean ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void d_uamean( DenseBlock a, DenseBlock c, int n, KahanObject kbuff, Mean kmean, int rl, int ru ) |
| { |
| final int bil = a.index(rl); |
| final int biu = a.index(ru-1); |
| int tlen = 0; |
| for(int bi=bil; bi<=biu; bi++) { |
| int lpos = (bi==bil) ? a.pos(rl) : 0; |
| int len = (bi==biu) ? a.pos(ru-1)-lpos+n : a.blockSize(bi)*n; |
| mean(a.valuesAt(bi), lpos, len, 0, kbuff, kmean); |
| tlen += len; |
| } |
| c.set(0, 0, kbuff._sum); |
| c.set(0, 1, tlen); |
| c.set(0, 2, kbuff._correction); |
| } |
| |
| /** |
| * ROWMEAN, opcode: uarmean, dense input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param kbuff ? |
| * @param kmean ? |
| * @param rl ? |
| * @param ru ? |
| */ |
| private static void d_uarmean( DenseBlock a, DenseBlock c, int n, KahanObject kbuff, Mean kmean, int rl, int ru ) |
| { |
| for( int i=rl; i<ru; i++ ) { |
| kbuff.set(0, 0); //reset buffer |
| mean(a.values(i), a.pos(i), n, 0, kbuff, kmean); |
| c.set(i, 0, kbuff._sum); |
| c.set(i, 1, n); |
| c.set(i, 2, kbuff._correction); |
| } |
| } |
| |
| /** |
| * COLMEAN, opcode: uacmean, dense input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param kbuff ? |
| * @param kmean ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void d_uacmean( DenseBlock a, DenseBlock c, int n, KahanObject kbuff, Mean kmean, int rl, int ru ) { |
| //execute builtin aggregate |
| for( int i=rl; i<ru; i++ ) |
| meanAgg( a.values(i), c, a.pos(i), n, kbuff, kmean ); |
| } |
| |
| /** |
| * VAR, opcode: uavar, dense input. |
| * |
| * @param a Array of values. |
| * @param c Output array to store variance, mean, count, |
| * m2 correction factor, and mean correction factor. |
| * @param n Number of values per row. |
| * @param cbuff A CM_COV_Object to hold various intermediate |
| * values for the variance calculation. |
| * @param cm A CM object of type Variance to perform the variance |
| * calculation. |
| * @param rl Lower row limit. |
| * @param ru Upper row limit. |
| */ |
| private static void d_uavar(DenseBlock a, DenseBlock c, int n, CM_COV_Object cbuff, CM cm, int rl, int ru) { |
| final int bil = a.index(rl); |
| final int biu = a.index(ru-1); |
| for(int bi=bil; bi<=biu; bi++) { |
| int lpos = (bi==bil) ? a.pos(rl) : 0; |
| int len = (bi==biu) ? a.pos(ru-1)-lpos+n : a.blockSize(bi)*n; |
| var(a.valuesAt(bi), lpos, len, cbuff, cm); |
| } |
| // store results: { var | mean, count, m2 correction, mean correction } |
| c.set(0, 0, cbuff.getRequiredResult(AggregateOperationTypes.VARIANCE)); |
| c.set(0, 1, cbuff.mean._sum); |
| c.set(0, 2, cbuff.w); |
| c.set(0, 3, cbuff.m2._correction); |
| c.set(0, 4, cbuff.mean._correction); |
| } |
| |
| /** |
| * ROWVAR, opcode: uarvar, dense input. |
| * |
| * @param a Array of values. |
| * @param c Output array to store variance, mean, count, |
| * m2 correction factor, and mean correction factor |
| * for each row. |
| * @param n Number of values per row. |
| * @param cbuff A CM_COV_Object to hold various intermediate |
| * values for the variance calculation. |
| * @param cm A CM object of type Variance to perform the variance |
| * calculation. |
| * @param rl Lower row limit. |
| * @param ru Upper row limit. |
| */ |
| private static void d_uarvar(DenseBlock a, DenseBlock c, int n, CM_COV_Object cbuff, CM cm, int rl, int ru) { |
| // calculate variance for each row |
| for (int i=rl; i<ru; i++) { |
| cbuff.reset(); // reset buffer for each row |
| var(a.values(i), a.pos(i), n, cbuff, cm); |
| // store row results: { var | mean, count, m2 correction, mean correction } |
| c.set(i, 0, cbuff.getRequiredResult(AggregateOperationTypes.VARIANCE)); |
| c.set(i, 1, cbuff.mean._sum); |
| c.set(i, 2, cbuff.w); |
| c.set(i, 3, cbuff.m2._correction); |
| c.set(i, 4, cbuff.mean._correction); |
| } |
| } |
| |
| /** |
| * COLVAR, opcode: uacvar, dense input. |
| * |
| * @param a Array of values. |
| * @param c Output array to store variance, mean, count, |
| * m2 correction factor, and mean correction factor |
| * for each column. |
| * @param n Number of values per row. |
| * @param cbuff A CM_COV_Object to hold various intermediate |
| * values for the variance calculation. |
| * @param cm A CM object of type Variance to perform the variance |
| * calculation. |
| * @param rl Lower row limit. |
| * @param ru Upper row limit. |
| */ |
| private static void d_uacvar(DenseBlock a, DenseBlock c, int n, CM_COV_Object cbuff, CM cm, int rl, int ru) { |
| // calculate variance for each column incrementally |
| for (int i=rl; i<ru; i++) |
| varAgg(a.values(i), c, a.pos(i), n, cbuff, cm); |
| } |
| |
| /** |
| * PROD, opcode: ua*, dense input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void d_uam( DenseBlock a, DenseBlock c, int n, int rl, int ru ) { |
| final int bil = a.index(rl); |
| final int biu = a.index(ru-1); |
| double tmp = 1; |
| for(int bi=bil; bi<=biu; bi++) { |
| int lpos = (bi==bil) ? a.pos(rl) : 0; |
| int len = (bi==biu) ? a.pos(ru-1)-lpos+n : a.blockSize(bi)*n; |
| tmp *= product( a.valuesAt(bi), lpos, len ); |
| } |
| c.set(0, 0, tmp); |
| } |
| |
| /** |
| * ROWPROD, opcode: uar*, dense input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void d_uarm( DenseBlock a, DenseBlock c, int n, int rl, int ru ) { |
| double[] lc = c.values(0); |
| for( int i=rl; i<ru; i++ ) |
| lc[i] = product(a.values(i), a.pos(i), n); |
| } |
| |
| /** |
| * COLPROD, opcode: uac*, dense input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void d_uacm( DenseBlock a, DenseBlock c, int n, int rl, int ru ) { |
| double[] lc = c.set(1).values(0); //guaranteed single row |
| for( int i=rl; i<ru; i++ ) |
| LibMatrixMult.vectMultiplyWrite(a.values(i), lc, lc, a.pos(i), 0, 0, n); |
| } |
| |
| /** |
| * SUM, opcode: uak+, sparse input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param kbuff ? |
| * @param kplus ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void s_uakp( SparseBlock a, DenseBlock c, int n, KahanObject kbuff, KahanPlus kplus, int rl, int ru ) |
| { |
| if( a.isContiguous() ) { |
| sum(a.values(rl), a.pos(rl), (int)a.size(rl, ru), kbuff, kplus); |
| } |
| else { |
| for( int i=rl; i<ru; i++ ) { |
| if( !a.isEmpty(i) ) |
| sum(a.values(i), a.pos(i), a.size(i), kbuff, kplus); |
| } |
| } |
| c.set(kbuff); |
| } |
| |
| /** |
| * ROWSUM, opcode: uark+, sparse input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param kbuff ? |
| * @param kplus ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void s_uarkp( SparseBlock a, DenseBlock c, int n, KahanObject kbuff, KahanPlus kplus, int rl, int ru ) { |
| //compute row aggregates |
| for( int i=rl; i<ru; i++ ) { |
| if( a.isEmpty(i) ) continue; |
| kbuff.set(0, 0); //reset buffer |
| sum( a.values(i), a.pos(i), a.size(i), kbuff, kplus ); |
| c.set(i, kbuff); |
| } |
| } |
| |
| /** |
| * COLSUM, opcode: uack+, sparse input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param kbuff ? |
| * @param kplus ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void s_uackp( SparseBlock a, DenseBlock c, int n, KahanObject kbuff, KahanPlus kplus, int rl, int ru ) |
| { |
| //compute column aggregates |
| if( a.isContiguous() ) { |
| sumAgg( a.values(rl), c, a.indexes(rl), a.pos(rl), (int)a.size(rl, ru), n, kbuff, kplus ); |
| } |
| else { |
| for( int i=rl; i<ru; i++ ) { |
| if( !a.isEmpty(i) ) |
| sumAgg( a.values(i), c, a.indexes(i), a.pos(i), a.size(i), n, kbuff, kplus ); |
| } |
| } |
| } |
| |
| /** |
| * SUM_SQ, opcode: uasqk+, sparse input. |
| * |
| * @param a Sparse array of values to square & sum. |
| * @param c Output array to store sum and correction factor. |
| * @param n Number of values per row. |
| * @param kbuff A KahanObject to hold the current sum and |
| * correction factor for the Kahan summation |
| * algorithm. |
| * @param kplusSq A KahanPlusSq object to perform summation of |
| * squared values. |
| * @param rl Lower row limit. |
| * @param ru Upper row limit. |
| */ |
| private static void s_uasqkp(SparseBlock a, DenseBlock c, int n, KahanObject kbuff, KahanPlusSq kplusSq, int rl, int ru ) |
| { |
| if( a.isContiguous() ) { |
| sum(a.values(rl), a.pos(rl), (int)a.size(rl, ru), kbuff, kplusSq); |
| } |
| else { |
| for (int i=rl; i<ru; i++) { |
| if (!a.isEmpty(i)) |
| sum(a.values(i), a.pos(i), a.size(i), kbuff, kplusSq); |
| } |
| } |
| c.set(kbuff); |
| } |
| |
| /** |
| * ROWSUM_SQ, opcode: uarsqk+, sparse input. |
| * |
| * @param a Sparse array of values to square & sum row-wise. |
| * @param c Output array to store sum and correction factor |
| * for each row. |
| * @param n Number of values per row. |
| * @param kbuff A KahanObject to hold the current sum and |
| * correction factor for the Kahan summation |
| * algorithm. |
| * @param kplusSq A KahanPlusSq object to perform summation of |
| * squared values. |
| * @param rl Lower row limit. |
| * @param ru Upper row limit. |
| */ |
| private static void s_uarsqkp(SparseBlock a, DenseBlock c, int n, KahanObject kbuff, KahanPlusSq kplusSq, int rl, int ru ) |
| { |
| //compute row aggregates |
| for (int i=rl; i<ru; i++) { |
| if( a.isEmpty(i) ) continue; |
| kbuff.set(0, 0); //reset buffer |
| sum(a.values(i), a.pos(i), a.size(i), kbuff, kplusSq); |
| c.set(i, kbuff); |
| } |
| } |
| |
| /** |
| * COLSUM_SQ, opcode: uacsqk+, sparse input. |
| * |
| * @param a Sparse array of values to square & sum column-wise. |
| * @param c Output array to store sum and correction factor |
| * for each column. |
| * @param n Number of values per row. |
| * @param kbuff A KahanObject to hold the current sum and |
| * correction factor for the Kahan summation |
| * algorithm. |
| * @param kplusSq A KahanPlusSq object to perform summation of |
| * squared values. |
| * @param rl Lower row limit. |
| * @param ru Upper row limit. |
| */ |
| private static void s_uacsqkp(SparseBlock a, DenseBlock c, int n, KahanObject kbuff, KahanPlusSq kplusSq, int rl, int ru ) |
| { |
| //compute column aggregates |
| if( a.isContiguous() ) { |
| sumAgg(a.values(rl), c, a.indexes(rl), a.pos(rl), (int)a.size(rl, ru), n, kbuff, kplusSq); |
| } |
| else { |
| for (int i=rl; i<ru; i++) { |
| if (!a.isEmpty(i)) |
| sumAgg(a.values(i), c, a.indexes(i), a.pos(i), a.size(i), n, kbuff, kplusSq); |
| } |
| } |
| } |
| |
| /** |
| * CUMSUM, opcode: ucumk+, sparse input. |
| * |
| * @param a ? |
| * @param agg ? |
| * @param c ? |
| * @param m ? |
| * @param n ? |
| * @param kbuff ? |
| * @param kplus ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void s_ucumkp( SparseBlock a, double[] agg, DenseBlock c, int m, int n, KahanObject kbuff, KahanPlus kplus, int rl, int ru ) |
| { |
| //init current row sum/correction arrays w/ neutral 0 |
| DenseBlock csums = DenseBlockFactory.createDenseBlock(2, n); |
| if( agg != null ) |
| csums.set(0, agg); |
| //scan once and compute prefix sums |
| for( int i=rl; i<ru; i++ ) { |
| if( !a.isEmpty(i) ) |
| sumAgg( a.values(i), csums, a.indexes(i), a.pos(i), a.size(i), n, kbuff, kplus ); |
| //always copy current sum (not sparse-safe) |
| c.set(i, csums.values(0)); |
| } |
| } |
| |
| /** |
| * CUMPROD, opcode: ucum*, sparse input. |
| * |
| * @param a ? |
| * @param agg ? |
| * @param c ? |
| * @param n ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void s_ucumm( SparseBlock a, double[] agg, double[] c, int n, int rl, int ru ) |
| { |
| //init current row prod arrays w/ neutral 1 |
| double[] cprod = (agg!=null) ? agg : new double[ n ]; |
| if( agg == null ) |
| Arrays.fill(cprod, 1); |
| |
| //init count arrays (helper, see correction) |
| int[] cnt = new int[ n ]; |
| |
| //scan once and compute prefix products |
| for( int i=rl, ix=rl*n; i<ru; i++, ix+=n ) |
| { |
| //multiply row of non-zero elements |
| if( !a.isEmpty(i) ) { |
| int apos = a.pos(i); |
| int alen = a.size(i); |
| int[] aix = a.indexes(i); |
| double[] avals = a.values(i); |
| productAgg( avals, cprod, aix, apos, 0, alen ); |
| countAgg( avals, cnt, aix, apos, alen ); |
| } |
| |
| //correction (not sparse-safe and cumulative) |
| //note: we need to determine if there are only nnz in a column |
| for( int j=0; j<n; j++ ) |
| if( cnt[j] < i+1 ) //no dense column |
| cprod[j] *= 0; |
| |
| //always copy current sum (not sparse-safe) |
| System.arraycopy(cprod, 0, c, ix, n); |
| } |
| } |
| |
| /** |
| * CUMMIN/CUMMAX, opcode: ucummin/ucummax, sparse input. |
| * |
| * @param a ? |
| * @param agg ? |
| * @param c ? |
| * @param n ? |
| * @param init ? |
| * @param builtin ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void s_ucummxx( SparseBlock a, double[] agg, double[] c, int n, double init, Builtin builtin, int rl, int ru ) |
| { |
| //init current row min/max array w/ extreme value |
| double[] cmxx = (agg!=null) ? agg : new double[ n ]; |
| if( agg == null ) |
| Arrays.fill(cmxx, init); |
| |
| //init count arrays (helper, see correction) |
| int[] cnt = new int[ n ]; |
| |
| //compute column aggregates min/max |
| for( int i=rl, ix=rl*n; i<ru; i++, ix+=n ) |
| { |
| if( !a.isEmpty(i) ) { |
| int apos = a.pos(i); |
| int alen = a.size(i); |
| int[] aix = a.indexes(i); |
| double[] avals = a.values(i); |
| builtinAgg( avals, cmxx, aix, apos, alen, builtin ); |
| countAgg( avals, cnt, aix, apos, alen ); |
| } |
| |
| //correction (not sparse-safe and cumulative) |
| //note: we need to determine if there are only nnz in a column |
| for( int j=0; j<n; j++ ) |
| if( cnt[j] < i+1 ) //no dense column |
| cmxx[j] = builtin.execute(cmxx[j], 0); |
| |
| //always copy current sum (not sparse-safe) |
| System.arraycopy(cmxx, 0, c, ix, n); |
| } |
| } |
| |
| /** |
| * TRACE, opcode: uaktrace, sparse input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param kbuff ? |
| * @param kplus ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void s_uakptrace( SparseBlock a, DenseBlock c, int n, KahanObject kbuff, KahanPlus kplus, int rl, int ru ) { |
| for( int i=rl; i<ru; i++ ) |
| if( !a.isEmpty(i) ) |
| kplus.execute2(kbuff, a.get(i,i)); |
| c.set(kbuff); |
| } |
| |
| /** |
| * MIN/MAX, opcode: uamin/uamax, sparse input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param init ? |
| * @param builtin ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void s_uamxx( SparseBlock a, DenseBlock c, int n, double init, Builtin builtin, int rl, int ru ) |
| { |
| double ret = init; //keep init val |
| |
| if( a.isContiguous() ) { |
| int alen = (int) a.size(rl, ru); |
| double val = builtin(a.values(rl), a.pos(rl), init, alen, builtin); |
| ret = builtin.execute(ret, val); |
| //correction (not sparse-safe) |
| ret = (alen<(ru-rl)*n) ? builtin.execute(ret, 0) : ret; |
| } |
| else { |
| for( int i=rl; i<ru; i++ ) { |
| if( !a.isEmpty(i) ) { |
| double lval = builtin(a.values(i), a.pos(i), init, a.size(i), builtin); |
| ret = builtin.execute(ret, lval); |
| } |
| //correction (not sparse-safe) |
| if( a.size(i) < n ) |
| ret = builtin.execute(ret, 0); |
| } |
| } |
| c.set(0, 0, ret); |
| } |
| |
| /** |
| * ROWMIN/ROWMAX, opcode: uarmin/uarmax, sparse input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param init ? |
| * @param builtin ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void s_uarmxx( SparseBlock a, DenseBlock c, int n, double init, Builtin builtin, int rl, int ru ) { |
| //init result (for empty rows) |
| c.set(rl, ru, 0, 1, init); //not sparse-safe |
| |
| for( int i=rl; i<ru; i++ ) { |
| if( !a.isEmpty(i) ) |
| c.set(i, 0, builtin(a.values(i), a.pos(i), init, a.size(i), builtin)); |
| //correction (not sparse-safe) |
| if( a.size(i) < n ) |
| c.set(i, 0, builtin.execute(c.get(i, 0), 0)); |
| } |
| } |
| |
| /** |
| * COLMIN/COLMAX, opcode: uacmin/uacmax, sparse input. |
| * |
| * @param a ? |
| * @param dc ? |
| * @param m ? |
| * @param n ? |
| * @param init ? |
| * @param builtin ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void s_uacmxx( SparseBlock a, DenseBlock dc, int m, int n, double init, Builtin builtin, int rl, int ru ) |
| { |
| //init output (base for incremental agg) |
| dc.set(init); |
| |
| //init count arrays (helper, see correction) |
| //due to missing correction guaranteed to be single block |
| double[] c = dc.valuesAt(0); |
| int[] cnt = new int[ n ]; |
| |
| //compute column aggregates min/max |
| if( a.isContiguous() ) { |
| int alen = (int) a.size(rl, ru); |
| builtinAgg( a.values(rl), c, a.indexes(rl), a.pos(rl), alen, builtin ); |
| countAgg( a.values(rl), cnt, a.indexes(rl), a.pos(rl), alen ); |
| } |
| else { |
| for( int i=rl; i<ru; i++ ) { |
| if( !a.isEmpty(i) ) { |
| int apos = a.pos(i); |
| int alen = a.size(i); |
| double[] avals = a.values(i); |
| int[] aix = a.indexes(i); |
| builtinAgg( avals, c, aix, apos, alen, builtin ); |
| countAgg( avals, cnt, aix, apos, alen ); |
| } |
| } |
| } |
| |
| //correction (not sparse-safe) |
| //note: we need to determine if there are only nnz in a column |
| // in order to know if for example a colMax of -8 is true or need |
| // to be replaced with a 0 because there was a missing nonzero. |
| for( int i=0; i<n; i++ ) |
| if( cnt[i] < ru-rl ) //no dense column |
| c[i] = builtin.execute(c[i], 0); |
| } |
| |
| /** |
| * ROWINDEXMAX, opcode: uarimax, sparse input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param init ? |
| * @param builtin ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void s_uarimax( SparseBlock a, DenseBlock c, int n, double init, Builtin builtin, int rl, int ru ) { |
| if( n <= 0 ) |
| throw new DMLRuntimeException("rowIndexMax is undefined for ncol="+n); |
| for( int i=rl; i<ru; i++ ) { |
| if( !a.isEmpty(i) ) { |
| int apos = a.pos(i); |
| int alen = a.size(i); |
| int[] aix = a.indexes(i); |
| double[] avals = a.values(i); |
| int maxindex = indexmax(a.values(i), apos, init, alen, builtin); |
| double maxvalue = avals[apos+maxindex]; |
| c.set(i, 0, (double)aix[apos+maxindex] + 1); |
| c.set(i, 1, maxvalue); |
| //correction (not sparse-safe) |
| if( alen < n && builtin.execute(0, maxvalue) == 1 ) { |
| int ix = n-1; //find last 0 value |
| for( int j=apos+alen-1; j>=apos; j--, ix-- ) |
| if( aix[j]!=ix ) |
| break; |
| c.set(i, 0, ix + 1); //max index (last) |
| c.set(i, 1, 0); //max value |
| } |
| } |
| else { //if( arow==null ) |
| //correction (not sparse-safe) |
| c.set(i, 0, n); //max index (last) |
| c.set(i, 1, 0); //max value |
| } |
| } |
| } |
| |
| /** |
| * ROWINDEXMIN, opcode: uarimin, sparse input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param init ? |
| * @param builtin ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void s_uarimin( SparseBlock a, DenseBlock c, int n, double init, Builtin builtin, int rl, int ru ) { |
| if( n <= 0 ) |
| throw new DMLRuntimeException("rowIndexMin is undefined for ncol="+n); |
| for( int i=rl; i<ru; i++ ) { |
| if( !a.isEmpty(i) ) { |
| int apos = a.pos(i); |
| int alen = a.size(i); |
| int[] aix = a.indexes(i); |
| double[] avals = a.values(i); |
| int minindex = indexmin(avals, apos, init, alen, builtin); |
| double minvalue = avals[apos+minindex]; |
| c.set(i, 0, (double)aix[apos+minindex] + 1); |
| c.set(i, 1, minvalue); //min value among non-zeros |
| //correction (not sparse-safe) |
| if(alen < n && builtin.execute(0, minvalue) == 1) { |
| int ix = n-1; //find last 0 value |
| for( int j=alen-1; j>=0; j--, ix-- ) |
| if( aix[apos+j]!=ix ) |
| break; |
| c.set(i, 0, ix + 1); //min index (last) |
| c.set(i, 1, 0); //min value |
| } |
| } |
| else { //if( arow==null ) |
| //correction (not sparse-safe) |
| c.set(i, 0, n); //min index (last) |
| c.set(i, 1, 0); //min value |
| } |
| } |
| } |
| |
| /** |
| * MEAN, opcode: uamean, sparse input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param kbuff ? |
| * @param kmean ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void s_uamean( SparseBlock a, DenseBlock c, int n, KahanObject kbuff, Mean kmean, int rl, int ru ) |
| { |
| int len = (ru-rl) * n; |
| int count = 0; |
| |
| //correction remaining tuples (not sparse-safe) |
| //note: before aggregate computation in order to |
| //exploit 0 sum (noop) and better numerical stability |
| count += (ru-rl)*n - a.size(rl, ru); |
| |
| //compute aggregate mean |
| if( a.isContiguous() ) { |
| int alen = (int) a.size(rl, ru); |
| mean(a.values(rl), a.pos(rl), alen, count, kbuff, kmean); |
| count += alen; |
| } |
| else { |
| for( int i=rl; i<ru; i++ ) { |
| if( !a.isEmpty(i) ) { |
| int alen = a.size(i); |
| mean(a.values(i), a.pos(i), alen, count, kbuff, kmean); |
| count += alen; |
| } |
| } |
| } |
| c.set(0, 0 , kbuff._sum); |
| c.set(0, 1, len); |
| c.set(0, 2, kbuff._correction); |
| } |
| |
| /** |
| * ROWMEAN, opcode: uarmean, sparse input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param kbuff ? |
| * @param kmean ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void s_uarmean( SparseBlock a, DenseBlock c, int n, KahanObject kbuff, Mean kmean, int rl, int ru ) { |
| for( int i=rl; i<ru; i++ ) { |
| //correction remaining tuples (not sparse-safe) |
| //note: before aggregate computation in order to |
| //exploit 0 sum (noop) and better numerical stability |
| int count = (a.isEmpty(i)) ? n : n-a.size(i); |
| kbuff.set(0, 0); //reset buffer |
| if( !a.isEmpty(i) ) |
| mean(a.values(i), a.pos(i), a.size(i), count, kbuff, kmean); |
| c.set(i, 0, kbuff._sum); |
| c.set(i, 1, n); |
| c.set(i, 2, kbuff._correction); |
| } |
| } |
| |
| /** |
| * COLMEAN, opcode: uacmean, sparse input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param kbuff ? |
| * @param kmean ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void s_uacmean( SparseBlock a, DenseBlock c, int n, KahanObject kbuff, Mean kmean, int rl, int ru ) |
| { |
| //correction remaining tuples (not sparse-safe) |
| //note: before aggregate computation in order to |
| //exploit 0 sum (noop) and better numerical stability |
| c.set(1, 2, 0, n, ru-rl); |
| double[] lc = c.values(1); //counts single row |
| int cpos = c.pos(1); |
| if( a.isContiguous() ) { |
| countDisAgg( a.values(rl), lc, a.indexes(rl), a.pos(rl), cpos, (int)a.size(rl, ru) ); |
| } |
| else { |
| for( int i=rl; i<ru; i++ ) { |
| if( !a.isEmpty(i) ) |
| countDisAgg( a.values(i), lc, a.indexes(i), a.pos(i), cpos, a.size(i) ); |
| } |
| } |
| |
| //compute column aggregate means |
| if( a.isContiguous() ) { |
| meanAgg( a.values(rl), c, a.indexes(rl), a.pos(rl), (int)a.size(rl, ru), n, kbuff, kmean ); |
| } |
| else { |
| for( int i=rl; i<ru; i++ ) { |
| if( !a.isEmpty(i) ) |
| meanAgg( a.values(i), c, a.indexes(i), a.pos(i), a.size(i), n, kbuff, kmean ); |
| } |
| } |
| } |
| |
| /** |
| * VAR, opcode: uavar, sparse input. |
| * |
| * @param a Sparse array of values. |
| * @param c Output array to store variance, mean, count, |
| * m2 correction factor, and mean correction factor. |
| * @param n Number of values per row. |
| * @param cbuff A CM_COV_Object to hold various intermediate |
| * values for the variance calculation. |
| * @param cm A CM object of type Variance to perform the variance |
| * calculation. |
| * @param rl Lower row limit. |
| * @param ru Upper row limit. |
| */ |
| private static void s_uavar(SparseBlock a, DenseBlock c, int n, CM_COV_Object cbuff, CM cm, int rl, int ru) { |
| // compute and store count of empty cells before aggregation |
| int count = (ru-rl)*n - (int)a.size(rl, ru); |
| cbuff.w = count; |
| // calculate aggregated variance (only using non-empty cells) |
| if( a.isContiguous() ) { |
| var(a.values(rl), a.pos(rl), (int)a.size(rl, ru), cbuff, cm); |
| } |
| else { |
| for (int i=rl; i<ru; i++) { |
| if (!a.isEmpty(i)) |
| var(a.values(i), a.pos(i), a.size(i), cbuff, cm); |
| } |
| } |
| // store results: { var | mean, count, m2 correction, mean correction } |
| c.set(0, 0, cbuff.getRequiredResult(AggregateOperationTypes.VARIANCE)); |
| c.set(0, 1, cbuff.mean._sum); |
| c.set(0, 2, cbuff.w); |
| c.set(0, 3, cbuff.m2._correction); |
| c.set(0, 4, cbuff.mean._correction); |
| } |
| |
| /** |
| * ROWVAR, opcode: uarvar, sparse input. |
| * |
| * @param a Sparse array of values. |
| * @param c Output array to store variance, mean, count, |
| * m2 correction factor, and mean correction factor. |
| * @param n Number of values per row. |
| * @param cbuff A CM_COV_Object to hold various intermediate |
| * values for the variance calculation. |
| * @param cm A CM object of type Variance to perform the variance |
| * calculation. |
| * @param rl Lower row limit. |
| * @param ru Upper row limit. |
| */ |
| private static void s_uarvar(SparseBlock a, DenseBlock c, int n, CM_COV_Object cbuff, CM cm, int rl, int ru) { |
| // calculate aggregated variance for each row |
| for( int i=rl; i<ru; i++ ) { |
| cbuff.reset(); // reset buffer for each row |
| // compute and store count of empty cells in this row before aggregation |
| int count = (a.isEmpty(i)) ? n : n-a.size(i); |
| cbuff.w = count; |
| if (!a.isEmpty(i)) |
| var(a.values(i), a.pos(i), a.size(i), cbuff, cm); |
| // store results: { var | mean, count, m2 correction, mean correction } |
| c.set(i, 0, cbuff.getRequiredResult(AggregateOperationTypes.VARIANCE)); |
| c.set(i, 1, cbuff.mean._sum); |
| c.set(i, 2, cbuff.w); |
| c.set(i, 3, cbuff.m2._correction); |
| c.set(i, 4, cbuff.mean._correction); |
| } |
| } |
| |
| /** |
| * COLVAR, opcode: uacvar, sparse input. |
| * |
| * @param a Sparse array of values. |
| * @param c Output array to store variance, mean, count, |
| * m2 correction factor, and mean correction factor. |
| * @param n Number of values per row. |
| * @param cbuff A CM_COV_Object to hold various intermediate |
| * values for the variance calculation. |
| * @param cm A CM object of type Variance to perform the variance |
| * calculation. |
| * @param rl Lower row limit. |
| * @param ru Upper row limit. |
| */ |
| private static void s_uacvar(SparseBlock a, DenseBlock c, int n, CM_COV_Object cbuff, CM cm, int rl, int ru) { |
| // compute and store counts of empty cells per column before aggregation |
| // note: column results are { var | mean, count, m2 correction, mean correction } |
| // - first, store total possible column counts in 3rd row of output |
| c.set(2, 3, 0, n, ru-rl); // counts stored in 3rd row |
| // - then subtract one from the column count for each dense value in the column |
| double[] lc = c.values(2); |
| int cpos = c.pos(2); |
| if( a.isContiguous() ) { |
| countDisAgg(a.values(rl), lc, a.indexes(rl), a.pos(rl), cpos, (int)a.size(rl, ru)); |
| } |
| else { |
| for (int i=rl; i<ru; i++) { |
| if (!a.isEmpty(i)) // counts stored in 3rd row |
| countDisAgg(a.values(i), lc, a.indexes(i), a.pos(i), cpos, a.size(i)); |
| } |
| } |
| |
| // calculate aggregated variance for each column |
| if( a.isContiguous() ) { |
| varAgg(a.values(rl), c, a.indexes(rl), a.pos(rl), (int)a.size(rl, ru), n, cbuff, cm); |
| } |
| else { |
| for (int i=rl; i<ru; i++) { |
| if (!a.isEmpty(i)) |
| varAgg(a.values(i), c, a.indexes(i), a.pos(i), a.size(i), n, cbuff, cm); |
| } |
| } |
| } |
| |
| /** |
| * PROD, opcode: ua*, sparse input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void s_uam( SparseBlock a, DenseBlock c, int n, int rl, int ru ) { |
| double ret = 1; |
| for( int i=rl; i<ru; i++ ) { |
| if( !a.isEmpty(i) ) { |
| int alen = a.size(i); |
| ret *= product(a.values(i), 0, alen); |
| ret *= (alen<n) ? 0 : 1; |
| } |
| //early abort (note: in case of NaNs this is an invalid optimization) |
| if( !NAN_AWARENESS && ret==0 ) break; |
| } |
| c.set(0, 0, ret); |
| } |
| |
| /** |
| * ROWPROD, opcode: uar*, sparse input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void s_uarm( SparseBlock a, DenseBlock c, int n, int rl, int ru ) { |
| double[] lc = c.valuesAt(0); |
| for( int i=rl; i<ru; i++ ) { |
| if( !a.isEmpty(i) ) { |
| int alen = a.size(i); |
| double tmp = product(a.values(i), 0, alen); |
| lc[i] = tmp * ((alen<n) ? 0 : 1); |
| } |
| } |
| } |
| |
| /** |
| * COLPROD, opcode: uac*, sparse input. |
| * |
| * @param a ? |
| * @param c ? |
| * @param n ? |
| * @param rl row lower index |
| * @param ru row upper index |
| */ |
| private static void s_uacm( SparseBlock a, DenseBlock c, int n, int rl, int ru ) { |
| double[] lc = c.set(1).valuesAt(0); |
| int[] cnt = new int[ n ]; |
| for( int i=rl; i<ru; i++ ) { |
| if( a.isEmpty(i) ) continue; |
| countAgg(a.values(i), cnt, a.indexes(i), a.pos(i), a.size(i)); |
| LibMatrixMult.vectMultiplyWrite(lc, a.values(i), lc, 0, a.pos(i), 0, a.size(i)); |
| } |
| for( int j=0; j<n; j++ ) |
| if( cnt[j] < ru-rl ) |
| lc[j] *= 0; |
| } |
| |
| |
| //////////////////////////////////////////// |
| // performance-relevant utility functions // |
| //////////////////////////////////////////// |
| |
| private static void sum(double[] a, int ai, final int len, KahanObject kbuff, KahanFunction kplus) { |
| for (int i=ai; i<ai+len; i++) |
| kplus.execute2(kbuff, a[i]); |
| } |
| |
| private static void sumAgg(double[] a, DenseBlock c, int ai, final int len, KahanObject kbuff, KahanFunction kplus) { |
| //note: output might span multiple physical blocks |
| double[] sum = c.values(0); |
| double[] corr = c.values(1); |
| int pos0 = c.pos(0), pos1 = c.pos(1); |
| for (int i=0; i<len; i++) { |
| kbuff._sum = sum[pos0+i]; |
| kbuff._correction = corr[pos1+i]; |
| kplus.execute2(kbuff, a[ai+i]); |
| sum[pos0+i] = kbuff._sum; |
| corr[pos1+i] = kbuff._correction; |
| } |
| } |
| |
| private static void sumAgg(double[] a, DenseBlock c, int[] aix, int ai, final int len, final int n, KahanObject kbuff, KahanFunction kplus) { |
| //note: output might span multiple physical blocks |
| double[] sum = c.values(0); |
| double[] corr = c.values(1); |
| int pos0 = c.pos(0), pos1 = c.pos(1); |
| for (int i=ai; i<ai+len; i++) { |
| int ix = aix[i]; |
| kbuff._sum = sum[pos0+ix]; |
| kbuff._correction = corr[pos1+ix]; |
| kplus.execute2(kbuff, a[i]); |
| sum[pos0+ix] = kbuff._sum; |
| corr[pos1+ix] = kbuff._correction; |
| } |
| } |
| |
| private static double product( double[] a, int ai, final int len ) { |
| double val = 1; |
| if( NAN_AWARENESS ) { |
| //product without early abort |
| //even if val is 0, it might turn into NaN. |
| for( int i=0; i<len; i++, ai++ ) |
| val *= a[ ai ]; |
| } |
| else { |
| //product with early abort (if 0) |
| //note: this will not work with NaNs (invalid optimization) |
| for( int i=0; i<len && val!=0; i++, ai++ ) |
| val *= a[ ai ]; |
| } |
| return val; |
| } |
| |
| private static void productAgg( double[] a, double[] c, int ai, int ci, final int len ) { |
| //always w/ NAN_AWARENESS: product without early abort; |
| //even if val is 0, it might turn into NaN. |
| //(early abort would require column-flags and branches) |
| for( int i=0; i<len; i++, ai++, ci++ ) |
| c[ ci ] *= a[ ai ]; |
| } |
| |
| private static void productAgg( double[] a, double[] c, int[] aix, int ai, int ci, final int len ) { |
| //always w/ NAN_AWARENESS: product without early abort; |
| //even if val is 0, it might turn into NaN. |
| //(early abort would require column-flags and branches) |
| for( int i=ai; i<ai+len; i++ ) |
| c[ ci + aix[i] ] *= a[ i ]; |
| } |
| |
| private static void mean( double[] a, int ai, final int len, int count, KahanObject kbuff, Mean mean ) { |
| //delta: (newvalue-buffer._sum)/count |
| for( int i=0; i<len; i++, ai++, count++ ) |
| mean.execute2(kbuff, a[ai], count+1); |
| } |
| |
| private static void meanAgg( double[] a, DenseBlock c, int ai, final int len, KahanObject kbuff, Mean mean ) { |
| //note: output might span multiple physical blocks |
| double[] sum = c.values(0); |
| double[] count = c.values(1); |
| double[] corr = c.values(2); |
| int pos0 = c.pos(0), pos1 = c.pos(1), pos2 = c.pos(2); |
| for( int i=0; i<len; i++ ) { |
| kbuff._sum = sum[pos0+i]; |
| double lcount = count[pos1+i] + 1; |
| kbuff._correction = corr[pos2+i]; |
| mean.execute2(kbuff, a[ai+i], lcount); |
| sum[pos0+i] = kbuff._sum; |
| count[pos1+i] = lcount; |
| corr[pos2+i] = kbuff._correction; |
| } |
| } |
| |
| private static void meanAgg( double[] a, DenseBlock c, int[] aix, int ai, final int len, final int n, KahanObject kbuff, Mean mean ) { |
| //note: output might span multiple physical blocks |
| double[] sum = c.values(0); |
| double[] count = c.values(1); |
| double[] corr = c.values(2); |
| int pos0 = c.pos(0), pos1 = c.pos(1), pos2 = c.pos(2); |
| for( int i=ai; i<ai+len; i++ ) { |
| int ix = aix[i]; |
| kbuff._sum = sum[pos0+ix]; |
| double lcount = count[pos1+ix] + 1; |
| kbuff._correction = corr[pos2+ix]; |
| mean.execute2(kbuff, a[ i ], lcount); |
| sum[pos0+ix] = kbuff._sum; |
| count[pos1+ix] = lcount; |
| corr[pos2+ix] = kbuff._correction; |
| } |
| } |
| |
| private static void var(double[] a, int ai, final int len, CM_COV_Object cbuff, CM cm) { |
| for(int i=0; i<len; i++, ai++) |
| cbuff = (CM_COV_Object) cm.execute(cbuff, a[ai]); |
| } |
| |
| private static void varAgg(double[] a, DenseBlock c, int ai, final int len, CM_COV_Object cbuff, CM cm) { |
| //note: output might span multiple physical blocks |
| double[] var = c.values(0); |
| double[] mean = c.values(1); |
| double[] count = c.values(2); |
| double[] m2corr = c.values(3); |
| double[] mcorr = c.values(4); |
| int pos0 = c.pos(0), pos1 = c.pos(1), |
| pos2 = c.pos(2), pos3 = c.pos(3), pos4 = c.pos(4); |
| for (int i=0; i<len; i++) { |
| // extract current values: { var | mean, count, m2 correction, mean correction } |
| cbuff.w = count[pos2+i]; // count |
| cbuff.m2._sum = var[pos0+i] * (cbuff.w - 1); // m2 = var * (n - 1) |
| cbuff.mean._sum = mean[pos1+i]; // mean |
| cbuff.m2._correction = m2corr[pos3+i]; |
| cbuff.mean._correction = mcorr[pos4+i]; |
| // calculate incremental aggregated variance |
| cbuff = (CM_COV_Object) cm.execute(cbuff, a[ai+i]); |
| // store updated values: { var | mean, count, m2 correction, mean correction } |
| var[pos0+i] = cbuff.getRequiredResult(AggregateOperationTypes.VARIANCE); |
| mean[pos1+i] = cbuff.mean._sum; |
| count[pos2+i] = cbuff.w; |
| m2corr[pos3+i] = cbuff.m2._correction; |
| mcorr[pos4+i] = cbuff.mean._correction; |
| } |
| } |
| |
| private static void varAgg(double[] a, DenseBlock c, int[] aix, int ai, final int len, final int n, CM_COV_Object cbuff, CM cm) |
| { |
| //note: output might span multiple physical blocks |
| double[] var = c.values(0); |
| double[] mean = c.values(1); |
| double[] count = c.values(2); |
| double[] m2corr = c.values(3); |
| double[] mcorr = c.values(4); |
| int pos0 = c.pos(0), pos1 = c.pos(1), |
| pos2 = c.pos(2), pos3 = c.pos(3), pos4 = c.pos(4); |
| for (int i=ai; i<ai+len; i++) { |
| // extract current values: { var | mean, count, m2 correction, mean correction } |
| int ix = aix[i]; |
| cbuff.w = count[pos2+ix]; // count |
| cbuff.m2._sum = var[pos0+ix] * (cbuff.w - 1); // m2 = var * (n - 1) |
| cbuff.mean._sum = mean[pos1+ix]; // mean |
| cbuff.m2._correction = m2corr[pos3+ix]; |
| cbuff.mean._correction = mcorr[pos4+ix]; |
| // calculate incremental aggregated variance |
| cbuff = (CM_COV_Object) cm.execute(cbuff, a[i]); |
| // store updated values: { var | mean, count, m2 correction, mean correction } |
| var[pos0+ix] = cbuff.getRequiredResult(AggregateOperationTypes.VARIANCE); |
| mean[pos1+ix] = cbuff.mean._sum; |
| count[pos2+ix] = cbuff.w; |
| m2corr[pos3+ix] = cbuff.m2._correction; |
| mcorr[pos4+ix] = cbuff.mean._correction; |
| } |
| } |
| |
| private static double builtin( double[] a, int ai, final double init, final int len, Builtin aggop ) { |
| double val = init; |
| for( int i=0; i<len; i++, ai++ ) |
| val = aggop.execute( val, a[ ai ] ); |
| return val; |
| } |
| |
| private static void builtinAgg( double[] a, double[] c, int ai, final int len, Builtin aggop ) { |
| for( int i=0; i<len; i++ ) |
| c[ i ] = aggop.execute( c[ i ], a[ ai+i ] ); |
| } |
| |
| private static void builtinAgg( double[] a, double[] c, int[] aix, int ai, final int len, Builtin aggop ) { |
| for( int i=ai; i<ai+len; i++ ) |
| c[ aix[i] ] = aggop.execute( c[ aix[i] ], a[ i ] ); |
| } |
| |
| private static int indexmax( double[] a, int ai, final double init, final int len, Builtin aggop ) { |
| double maxval = init; |
| int maxindex = -1; |
| for( int i=ai; i<ai+len; i++ ) { |
| maxindex = (a[i]>=maxval) ? i-ai : maxindex; |
| maxval = (a[i]>=maxval) ? a[i] : maxval; |
| } |
| //note: robustness for all-NaN rows |
| return Math.max(maxindex, 0); |
| } |
| |
| private static int indexmin( double[] a, int ai, final double init, final int len, Builtin aggop ) { |
| double minval = init; |
| int minindex = -1; |
| for( int i=ai; i<ai+len; i++ ) { |
| minindex = (a[i]<=minval) ? i-ai : minindex; |
| minval = (a[i]<=minval) ? a[i] : minval; |
| } |
| //note: robustness for all-NaN rows |
| return Math.max(minindex, 0); |
| } |
| |
| public static void countAgg( double[] a, int[] c, int[] aix, int ai, final int len ) { |
| final int bn = len%8; |
| //compute rest, not aligned to 8-block |
| for( int i=ai; i<ai+bn; i++ ) |
| c[ aix[i] ]++; |
| //unrolled 8-block (for better instruction level parallelism) |
| for( int i=ai+bn; i<ai+len; i+=8 ) { |
| c[ aix[ i+0 ] ] ++; |
| c[ aix[ i+1 ] ] ++; |
| c[ aix[ i+2 ] ] ++; |
| c[ aix[ i+3 ] ] ++; |
| c[ aix[ i+4 ] ] ++; |
| c[ aix[ i+5 ] ] ++; |
| c[ aix[ i+6 ] ] ++; |
| c[ aix[ i+7 ] ] ++; |
| } |
| } |
| |
| public static void countAgg( double[] a, int[] c, int ai, final int len ) { |
| final int bn = len%8; |
| //compute rest, not aligned to 8-block |
| for( int i=0; i<bn; i++ ) |
| c[i] += a[ai+i]!=0 ? 1 : 0; |
| //unrolled 8-block (for better instruction level parallelism) |
| for( int i=bn; i<len; i+=8 ) { |
| c[i+0] += a[ai+i+0]!=0 ? 1 : 0; |
| c[i+1] += a[ai+i+1]!=0 ? 1 : 0; |
| c[i+2] += a[ai+i+2]!=0 ? 1 : 0; |
| c[i+3] += a[ai+i+3]!=0 ? 1 : 0; |
| c[i+4] += a[ai+i+4]!=0 ? 1 : 0; |
| c[i+5] += a[ai+i+5]!=0 ? 1 : 0; |
| c[i+6] += a[ai+i+6]!=0 ? 1 : 0; |
| c[i+7] += a[ai+i+7]!=0 ? 1 : 0; |
| } |
| } |
| |
| private static void countDisAgg( double[] a, double[] c, int[] aix, int ai, final int ci, final int len ) { |
| final int bn = len%8; |
| //compute rest, not aligned to 8-block |
| for( int i=ai; i<ai+bn; i++ ) |
| c[ ci+aix[i] ]--; |
| //unrolled 8-block (for better instruction level parallelism) |
| for( int i=ai+bn; i<ai+len; i+=8 ) { |
| c[ ci+aix[ i+0 ] ] --; |
| c[ ci+aix[ i+1 ] ] --; |
| c[ ci+aix[ i+2 ] ] --; |
| c[ ci+aix[ i+3 ] ] --; |
| c[ ci+aix[ i+4 ] ] --; |
| c[ ci+aix[ i+5 ] ] --; |
| c[ ci+aix[ i+6 ] ] --; |
| c[ ci+aix[ i+7 ] ] --; |
| } |
| } |
| |
| ///////////////////////////////////////////////////////// |
| // Task Implementations for Multi-Threaded Operations // |
| ///////////////////////////////////////////////////////// |
| |
| private static abstract class AggTask implements Callable<Object> {} |
| |
| private static class RowAggTask extends AggTask |
| { |
| private MatrixBlock _in = null; |
| private MatrixBlock _ret = null; |
| private AggType _aggtype = null; |
| private AggregateUnaryOperator _uaop = null; |
| private int _rl = -1; |
| private int _ru = -1; |
| |
| protected RowAggTask( MatrixBlock in, MatrixBlock ret, AggType aggtype, AggregateUnaryOperator uaop, int rl, int ru ) |
| { |
| _in = in; |
| _ret = ret; |
| _aggtype = aggtype; |
| _uaop = uaop; |
| _rl = rl; |
| _ru = ru; |
| } |
| |
| @Override |
| public Object call() { |
| if( !_in.sparse ) |
| aggregateUnaryMatrixDense(_in, _ret, _aggtype, _uaop.aggOp.increOp.fn, _uaop.indexFn, _rl, _ru); |
| else |
| aggregateUnaryMatrixSparse(_in, _ret, _aggtype, _uaop.aggOp.increOp.fn, _uaop.indexFn, _rl, _ru); |
| return null; |
| } |
| } |
| |
| private static class PartialAggTask extends AggTask |
| { |
| private MatrixBlock _in = null; |
| private MatrixBlock _ret = null; |
| private AggType _aggtype = null; |
| private AggregateUnaryOperator _uaop = null; |
| private int _rl = -1; |
| private int _ru = -1; |
| |
| protected PartialAggTask( MatrixBlock in, MatrixBlock ret, AggType aggtype, AggregateUnaryOperator uaop, int rl, int ru ) { |
| _in = in; |
| _ret = ret; |
| _aggtype = aggtype; |
| _uaop = uaop; |
| _rl = rl; |
| _ru = ru; |
| } |
| |
| @Override |
| public Object call() { |
| //thead-local allocation for partial aggregation |
| _ret = new MatrixBlock(_ret.rlen, _ret.clen, false); |
| _ret.allocateDenseBlock(); |
| |
| if( !_in.sparse ) |
| aggregateUnaryMatrixDense(_in, _ret, _aggtype, _uaop.aggOp.increOp.fn, _uaop.indexFn, _rl, _ru); |
| else |
| aggregateUnaryMatrixSparse(_in, _ret, _aggtype, _uaop.aggOp.increOp.fn, _uaop.indexFn, _rl, _ru); |
| |
| //recompute non-zeros of partial result |
| _ret.recomputeNonZeros(); |
| |
| return null; |
| } |
| |
| public MatrixBlock getResult() { |
| return _ret; |
| } |
| } |
| |
| private static class CumAggTask implements Callable<Long> |
| { |
| private MatrixBlock _in = null; |
| private double[] _agg = null; |
| private MatrixBlock _ret = null; |
| private AggType _aggtype = null; |
| private UnaryOperator _uop = null; |
| private int _rl = -1; |
| private int _ru = -1; |
| |
| protected CumAggTask( MatrixBlock in, double[] agg, MatrixBlock ret, AggType aggtype, UnaryOperator uop, int rl, int ru ) { |
| _in = in; |
| _agg = agg; |
| _ret = ret; |
| _aggtype = aggtype; |
| _uop = uop; |
| _rl = rl; |
| _ru = ru; |
| } |
| |
| @Override |
| public Long call() { |
| //compute partial cumulative aggregate |
| if( !_in.sparse ) |
| cumaggregateUnaryMatrixDense(_in, _ret, _aggtype, _uop.fn, _agg, _rl, _ru); |
| else |
| cumaggregateUnaryMatrixSparse(_in, _ret, _aggtype, _uop.fn, _agg, _rl, _ru); |
| //recompute partial non-zeros (ru exlusive) |
| return _ret.recomputeNonZeros(_rl, _ru-1, 0, _ret.getNumColumns()-1); |
| } |
| } |
| |
| private static class AggTernaryTask implements Callable<MatrixBlock> |
| { |
| private final MatrixBlock _in1; |
| private final MatrixBlock _in2; |
| private final MatrixBlock _in3; |
| private MatrixBlock _ret = null; |
| private final IndexFunction _ixFn; |
| private final int _rl; |
| private final int _ru; |
| |
| protected AggTernaryTask( MatrixBlock in1, MatrixBlock in2, MatrixBlock in3, MatrixBlock ret, IndexFunction ixFn, int rl, int ru ) { |
| _in1 = in1; |
| _in2 = in2; |
| _in3 = in3; |
| _ret = ret; |
| _ixFn = ixFn; |
| _rl = rl; |
| _ru = ru; |
| } |
| |
| @Override |
| public MatrixBlock call() { |
| //thead-local allocation for partial aggregation |
| _ret = new MatrixBlock(_ret.rlen, _ret.clen, false); |
| _ret.allocateDenseBlock(); |
| |
| if( !_in1.sparse && !_in2.sparse && (_in3==null||!_in3.sparse) ) //DENSE |
| aggregateTernaryDense(_in1, _in2, _in3, _ret, _ixFn, _rl, _ru); |
| else //GENERAL CASE |
| aggregateTernaryGeneric(_in1, _in2, _in3, _ret, _ixFn, _rl, _ru); |
| |
| //recompute non-zeros of partial result |
| _ret.recomputeNonZeros(); |
| |
| return _ret; |
| } |
| } |
| |
| private static class GrpAggTask extends AggTask |
| { |
| private MatrixBlock _groups = null; |
| private MatrixBlock _target = null; |
| private MatrixBlock _weights = null; |
| private MatrixBlock _ret = null; |
| private int _numGroups = -1; |
| private Operator _op = null; |
| private int _cl = -1; |
| private int _cu = -1; |
| |
| protected GrpAggTask( MatrixBlock groups, MatrixBlock target, MatrixBlock weights, MatrixBlock ret, int numGroups, Operator op, int cl, int cu ) { |
| _groups = groups; |
| _target = target; |
| _weights = weights; |
| _ret = ret; |
| _numGroups = numGroups; |
| _op = op; |
| _cl = cl; |
| _cu = cu; |
| } |
| |
| @Override |
| public Object call() { |
| //CM operator for count, mean, variance |
| //note: current support only for column vectors |
| if( _op instanceof CMOperator ) { |
| CMOperator cmOp = (CMOperator) _op; |
| groupedAggregateCM(_groups, _target, _weights, _ret, _numGroups, cmOp, _cl, _cu); |
| } |
| //Aggregate operator for sum (via kahan sum) |
| //note: support for row/column vectors and dense/sparse |
| else if( _op instanceof AggregateOperator ) { |
| AggregateOperator aggop = (AggregateOperator) _op; |
| groupedAggregateKahanPlus(_groups, _target, _weights, _ret, _numGroups, aggop, _cl, _cu); |
| } |
| return null; |
| } |
| } |
| } |