| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.sysds.runtime.codegen; |
| |
| import java.io.Serializable; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| |
| import org.apache.sysds.runtime.DMLRuntimeException; |
| import org.apache.sysds.runtime.data.DenseBlock; |
| import org.apache.sysds.runtime.data.SparseBlock; |
| import org.apache.sysds.runtime.functionobjects.Builtin; |
| import org.apache.sysds.runtime.functionobjects.KahanFunction; |
| import org.apache.sysds.runtime.functionobjects.KahanPlus; |
| import org.apache.sysds.runtime.functionobjects.KahanPlusSq; |
| import org.apache.sysds.runtime.functionobjects.ValueFunction; |
| import org.apache.sysds.runtime.functionobjects.Builtin.BuiltinCode; |
| import org.apache.sysds.runtime.instructions.cp.DoubleObject; |
| import org.apache.sysds.runtime.instructions.cp.KahanObject; |
| import org.apache.sysds.runtime.instructions.cp.ScalarObject; |
| import org.apache.sysds.runtime.matrix.data.LibMatrixMult; |
| import org.apache.sysds.runtime.matrix.data.MatrixBlock; |
| import org.apache.sysds.runtime.util.CommonThreadPool; |
| import org.apache.sysds.runtime.util.UtilFunctions; |
| |
| public abstract class SpoofCellwise extends SpoofOperator implements Serializable |
| { |
| private static final long serialVersionUID = 3442528770573293590L; |
| |
| public enum CellType { |
| NO_AGG, |
| FULL_AGG, |
| ROW_AGG, |
| COL_AGG, |
| } |
| |
| //redefinition of Hop.AggOp for cleaner imports in generate class |
| public enum AggOp { |
| SUM, |
| SUM_SQ, |
| MIN, |
| MAX, |
| } |
| |
| private final CellType _type; |
| private final AggOp _aggOp; |
| private final boolean _sparseSafe; |
| private final boolean _containsSeq; |
| |
| public SpoofCellwise(CellType type, boolean sparseSafe, boolean containsSeq, AggOp aggOp) { |
| _type = type; |
| _aggOp = aggOp; |
| _sparseSafe = sparseSafe; |
| _containsSeq = containsSeq; |
| } |
| |
| public CellType getCellType() { |
| return _type; |
| } |
| |
| public AggOp getAggOp() { |
| return _aggOp; |
| } |
| |
| public boolean isSparseSafe() { |
| return _sparseSafe; |
| } |
| |
| public boolean containsSeq() { |
| return _containsSeq; |
| } |
| |
| @Override |
| public String getSpoofType() { |
| return "Cell" + getClass().getName().split("\\.")[1]; |
| } |
| |
| private ValueFunction getAggFunction() { |
| switch( _aggOp ) { |
| case SUM: return KahanPlus.getKahanPlusFnObject(); |
| case SUM_SQ: return KahanPlusSq.getKahanPlusSqFnObject(); |
| case MIN: return Builtin.getBuiltinFnObject(BuiltinCode.MIN); |
| case MAX: return Builtin.getBuiltinFnObject(BuiltinCode.MAX); |
| default: |
| throw new RuntimeException("Unsupported " |
| + "aggregation type: "+_aggOp.name()); |
| } |
| } |
| |
| @Override |
| public ScalarObject execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, int k) { |
| return execute(inputs, scalarObjects, k, 0); |
| } |
| |
| public ScalarObject execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, int k, long rix) { |
| //sanity check |
| if( inputs==null || inputs.size() < 1 ) |
| throw new RuntimeException("Invalid input arguments."); |
| |
| //input preparation |
| MatrixBlock a = inputs.get(0); |
| SideInput[] b = prepInputMatrices(inputs); |
| double[] scalars = prepInputScalars(scalarObjects); |
| final int m = a.getNumRows(); |
| final int n = a.getNumColumns(); |
| |
| //sparse safe check |
| boolean sparseSafe = isSparseSafe() || (b.length == 0 |
| && genexec( 0, b, scalars, m, n, 0, 0 ) == 0); |
| |
| long inputSize = sparseSafe ? |
| getTotalInputNnz(inputs) : getTotalInputSize(inputs); |
| if( inputSize < PAR_NUMCELL_THRESHOLD ) { |
| k = 1; //serial execution |
| } |
| |
| double ret = 0; |
| if( k <= 1 ) //SINGLE-THREADED |
| { |
| if( !inputs.get(0).isInSparseFormat() ) |
| ret = executeDenseAndAgg(a.getDenseBlock(), b, scalars, m, n, sparseSafe, 0, m, rix); |
| else |
| ret = executeSparseAndAgg(a.getSparseBlock(), b, scalars, m, n, sparseSafe, 0, m, rix); |
| } |
| else //MULTI-THREADED |
| { |
| try { |
| ExecutorService pool = CommonThreadPool.get(k); |
| ArrayList<ParAggTask> tasks = new ArrayList<>(); |
| int nk = UtilFunctions.roundToNext(Math.min(8*k,m/32), k); |
| int blklen = (int)(Math.ceil((double)m/nk)); |
| for( int i=0; i<nk & i*blklen<m; i++ ) |
| tasks.add(new ParAggTask(a, b, scalars, m, n, sparseSafe, i*blklen, Math.min((i+1)*blklen, m))); |
| //execute tasks |
| List<Future<Double>> taskret = pool.invokeAll(tasks); |
| pool.shutdown(); |
| |
| //aggregate partial results |
| ValueFunction vfun = getAggFunction(); |
| if( vfun instanceof KahanFunction ) { |
| KahanObject kbuff = new KahanObject(0, 0); |
| KahanPlus kplus = KahanPlus.getKahanPlusFnObject(); |
| for( Future<Double> task : taskret ) |
| kplus.execute2(kbuff, task.get()); |
| ret = kbuff._sum; |
| } |
| else { |
| for( Future<Double> task : taskret ) |
| ret = vfun.execute(ret, task.get()); |
| } |
| } |
| catch(Exception ex) { |
| throw new DMLRuntimeException(ex); |
| } |
| } |
| |
| //correction for min/max |
| if( (_aggOp == AggOp.MIN || _aggOp == AggOp.MAX) && sparseSafe |
| && a.getNonZeros()<a.getNumRows()*a.getNumColumns() ) |
| ret = getAggFunction().execute(ret, 0); //unseen 0 might be max or min value |
| |
| return new DoubleObject(ret); |
| } |
| |
| @Override |
| public MatrixBlock execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, MatrixBlock out) { |
| return execute(inputs, scalarObjects, out, 1, 0); |
| } |
| |
| @Override |
| public MatrixBlock execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, MatrixBlock out, int k) { |
| return execute(inputs, scalarObjects, out, k, 0); |
| } |
| |
| public MatrixBlock execute(ArrayList<MatrixBlock> inputs, ArrayList<ScalarObject> scalarObjects, MatrixBlock out, int k, long rix) { |
| //sanity check |
| if( inputs==null || inputs.size() < 1 || out==null ) |
| throw new RuntimeException("Invalid input arguments."); |
| |
| //input preparation |
| MatrixBlock a = inputs.get(0); |
| SideInput[] b = prepInputMatrices(inputs); |
| double[] scalars = prepInputScalars(scalarObjects); |
| final int m = a.getNumRows(); |
| final int n = a.getNumColumns(); |
| |
| //sparse safe check |
| boolean sparseSafe = isSparseSafe() || (b.length == 0 |
| && genexec( 0, b, scalars, m, n, 0, 0 ) == 0); |
| |
| long inputSize = sparseSafe ? |
| getTotalInputNnz(inputs) : getTotalInputSize(inputs); |
| if( inputSize < PAR_NUMCELL_THRESHOLD ) { |
| k = 1; //serial execution |
| } |
| |
| //result allocation and preparations |
| boolean sparseOut = _type == CellType.NO_AGG |
| && sparseSafe && a.isInSparseFormat(); |
| switch( _type ) { |
| case NO_AGG: out.reset(m, n, sparseOut); break; |
| case ROW_AGG: out.reset(m, 1, false); break; |
| case COL_AGG: out.reset(1, n, false); break; |
| default: throw new DMLRuntimeException("Invalid cell type: "+_type); |
| } |
| out.allocateBlock(); |
| |
| long lnnz = 0; |
| if( k <= 1 ) //SINGLE-THREADED |
| { |
| if( !inputs.get(0).isInSparseFormat() ) |
| lnnz = executeDense(a.getDenseBlock(), b, scalars, out, m, n, sparseSafe, 0, m, rix); |
| else |
| lnnz = executeSparse(a.getSparseBlock(), b, scalars, out, m, n, sparseSafe, 0, m, rix); |
| } |
| else //MULTI-THREADED |
| { |
| try { |
| ExecutorService pool = CommonThreadPool.get(k); |
| ArrayList<ParExecTask> tasks = new ArrayList<>(); |
| int nk = UtilFunctions.roundToNext(Math.min(8*k,m/32), k); |
| int blklen = (int)(Math.ceil((double)m/nk)); |
| for( int i=0; i<nk & i*blklen<m; i++ ) |
| tasks.add(new ParExecTask(a, b, scalars, out, m, n, |
| sparseSafe, i*blklen, Math.min((i+1)*blklen, m))); |
| //execute tasks |
| List<Future<Long>> taskret = pool.invokeAll(tasks); |
| pool.shutdown(); |
| |
| //aggregate nnz and error handling |
| for( Future<Long> task : taskret ) |
| lnnz += task.get(); |
| if( _type == CellType.COL_AGG ) { |
| //aggregate partial results |
| double[] c = out.getDenseBlockValues(); |
| ValueFunction vfun = getAggFunction(); |
| if( vfun instanceof KahanFunction ) { |
| for( ParExecTask task : tasks ) |
| LibMatrixMult.vectAdd(task.getResult().getDenseBlockValues(), c, 0, 0, n); |
| } |
| else { |
| for( ParExecTask task : tasks ) { |
| double[] tmp = task.getResult().getDenseBlockValues(); |
| for(int j=0; j<n; j++) |
| c[j] = vfun.execute(c[j], tmp[j]); |
| } |
| } |
| lnnz = out.recomputeNonZeros(); |
| } |
| } |
| catch(Exception ex) { |
| throw new DMLRuntimeException(ex); |
| } |
| } |
| |
| //post-processing |
| out.setNonZeros(lnnz); |
| out.examSparsity(); |
| return out; |
| } |
| |
| ///////// |
| //function dispatch |
| |
| private long executeDense(DenseBlock a, SideInput[] b, double[] scalars, |
| MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru, long rix) { |
| DenseBlock c = out.getDenseBlock(); |
| SideInput[] lb = createSparseSideInputs(b); |
| |
| if( _type == CellType.NO_AGG ) { |
| return executeDenseNoAgg(a, lb, scalars, c, m, n, sparseSafe, rl, ru, rix); |
| } |
| else if( _type == CellType.ROW_AGG ) { |
| if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ ) |
| return executeDenseRowAggSum(a, lb, scalars, c, m, n, sparseSafe, rl, ru, rix); |
| else |
| return executeDenseRowAggMxx(a, lb, scalars, c, m, n, sparseSafe, rl, ru, rix); |
| } |
| else if( _type == CellType.COL_AGG ) { |
| if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ ) |
| return executeDenseColAggSum(a, lb, scalars, c, m, n, sparseSafe, rl, ru, rix); |
| else |
| return executeDenseColAggMxx(a, lb, scalars, c, m, n, sparseSafe, rl, ru, rix); |
| } |
| return -1; |
| } |
| |
| private double executeDenseAndAgg(DenseBlock a, SideInput[] b, double[] scalars, |
| int m, int n, boolean sparseSafe, int rl, int ru, long rix) |
| { |
| SideInput[] lb = createSparseSideInputs(b); |
| |
| //numerically stable aggregation for sum/sum_sq |
| if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ ) |
| return executeDenseAggSum(a, lb, scalars, m, n, sparseSafe, rl, ru, rix); |
| else |
| return executeDenseAggMxx(a, lb, scalars, m, n, sparseSafe, rl, ru, rix); |
| } |
| |
| private long executeSparse(SparseBlock sblock, SideInput[] b, double[] scalars, |
| MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru, long rix) |
| { |
| if( sparseSafe && sblock == null ) |
| return 0; |
| |
| SideInput[] lb = createSparseSideInputs(b); |
| if( _type == CellType.NO_AGG ) { |
| if( out.isInSparseFormat() ) |
| return executeSparseNoAggSparse(sblock, lb, scalars, out, m, n, sparseSafe, rl, ru, rix); |
| else |
| return executeSparseNoAggDense(sblock, lb, scalars, out, m, n, sparseSafe, rl, ru, rix); |
| } |
| else if( _type == CellType.ROW_AGG ) { |
| if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ ) |
| return executeSparseRowAggSum(sblock, lb, scalars, out, m, n, sparseSafe, rl, ru, rix); |
| else |
| return executeSparseRowAggMxx(sblock, lb, scalars, out, m, n, sparseSafe, rl, ru, rix); |
| } |
| else if( _type == CellType.COL_AGG ) { |
| if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ ) |
| return executeSparseColAggSum(sblock, lb, scalars, out, m, n, sparseSafe, rl, ru, rix); |
| else |
| return executeSparseColAggMxx(sblock, lb, scalars, out, m, n, sparseSafe, rl, ru, rix); |
| } |
| |
| return -1; |
| } |
| |
| private double executeSparseAndAgg(SparseBlock sblock, SideInput[] b, double[] scalars, |
| int m, int n, boolean sparseSafe, int rl, int ru, long rix) |
| { |
| if( sparseSafe && sblock == null ) |
| return 0; |
| |
| SideInput[] lb = createSparseSideInputs(b); |
| if( _aggOp == AggOp.SUM || _aggOp == AggOp.SUM_SQ ) |
| return executeSparseAggSum(sblock, lb, scalars, m, n, sparseSafe, rl, ru, rix); |
| else |
| return executeSparseAggMxx(sblock, lb, scalars, m, n, sparseSafe, rl, ru, rix); |
| } |
| |
| ///////// |
| //core operator skeletons for dense, sparse, and compressed |
| |
| private long executeDenseNoAgg(DenseBlock a, SideInput[] b, double[] scalars, |
| DenseBlock c, int m, int n, boolean sparseSafe, int rl, int ru, long rix) |
| { |
| long lnnz = 0; |
| if( a == null && !sparseSafe ) { |
| for( int i=rl; i<ru; i++ ) { |
| double[] cvals = c.values(i); |
| int cix = c.pos(i); |
| for( int j=0; j<n; j++ ) |
| lnnz += ((cvals[cix+j] = genexec(0, b, scalars, m, n, rix+i, i, j))!=0) ? 1 : 0; |
| } |
| } |
| else if( a != null ) { |
| for( int i=rl; i<ru; i++ ) { |
| double[] avals = a.values(i); |
| double[] cvals = c.values(i); |
| int ix = a.pos(i); |
| for( int j=0; j<n; j++ ) { |
| double aval = avals[ix+j]; |
| if( aval != 0 || !sparseSafe) |
| lnnz += ((cvals[ix+j] = genexec(aval, b, scalars, m, n, rix+i, i, j))!=0) ? 1 : 0; |
| } |
| } |
| } |
| |
| return lnnz; |
| } |
| |
| private long executeDenseRowAggSum(DenseBlock a, SideInput[] b, double[] scalars, |
| DenseBlock c, int m, int n, boolean sparseSafe, int rl, int ru, long rix) |
| { |
| //note: output always single block |
| double[] lc = c.valuesAt(0); |
| KahanFunction kplus = (KahanFunction) getAggFunction(); |
| KahanObject kbuff = new KahanObject(0, 0); |
| |
| long lnnz = 0; |
| if( a == null && !sparseSafe ) { |
| for( int i=rl; i<ru; i++ ) { |
| kbuff.set(0, 0); |
| for( int j=0; j<n; j++ ) |
| kplus.execute2(kbuff, genexec(0, b, scalars, m, n, rix+i, i, j)); |
| lnnz += ((lc[i] = kbuff._sum)!=0) ? 1 : 0; |
| } |
| } |
| else if( a != null ) { |
| for( int i=rl; i<ru; i++ ) { |
| kbuff.set(0, 0); |
| double[] avals = a.values(i); |
| int aix = a.pos(i); |
| for( int j=0; j<n; j++ ) { |
| double aval = avals[aix+j]; |
| if( aval != 0 || !sparseSafe) |
| kplus.execute2(kbuff, genexec(aval, b, scalars, m, n, rix+i, i, j)); |
| } |
| lnnz += ((lc[i] = kbuff._sum)!=0) ? 1 : 0; |
| } |
| } |
| |
| return lnnz; |
| } |
| |
| private long executeDenseRowAggMxx(DenseBlock a, SideInput[] b, double[] scalars, |
| DenseBlock c, int m, int n, boolean sparseSafe, int rl, int ru, long rix) |
| { |
| double[] lc = c.valuesAt(0); //single block |
| |
| double initialVal = (_aggOp==AggOp.MIN) ? Double.POSITIVE_INFINITY : Double.NEGATIVE_INFINITY; |
| ValueFunction vfun = getAggFunction(); |
| long lnnz = 0; |
| if( a == null && !sparseSafe ) { //empty |
| for( int i=rl; i<ru; i++ ) { |
| double tmp = initialVal; |
| for( int j=0; j<n; j++ ) |
| tmp = vfun.execute(tmp, genexec(0, b, scalars, m, n, rix+i, i, j)); |
| lnnz += ((lc[i] = tmp)!=0) ? 1 : 0; |
| } |
| } |
| else if( a != null ) { //general case |
| for( int i=rl; i<ru; i++ ) { |
| double tmp = initialVal; |
| double[] avals = a.values(i); |
| int aix = a.pos(i); |
| for( int j=0; j<n; j++ ) { |
| double aval = avals[aix + j]; |
| if( aval != 0 || !sparseSafe) |
| tmp = vfun.execute(tmp, genexec(aval, b, scalars, m, n, rix+i, i, j)); |
| } |
| if( sparseSafe && UtilFunctions.containsZero(avals, aix, n) ) |
| tmp = vfun.execute(tmp, 0); |
| lnnz += ((lc[i] = tmp)!=0) ? 1 : 0; |
| } |
| } |
| return lnnz; |
| } |
| |
| private long executeDenseColAggSum(DenseBlock a, SideInput[] b, double[] scalars, |
| DenseBlock c, int m, int n, boolean sparseSafe, int rl, int ru, long rix) |
| { |
| double[] lc = c.valuesAt(0); //single block |
| |
| KahanFunction kplus = (KahanFunction) getAggFunction(); |
| KahanObject kbuff = new KahanObject(0, 0); |
| double[] corr = new double[n]; |
| |
| if( a == null && !sparseSafe ) { |
| for( int i=rl; i<ru; i++ ) |
| for( int j=0; j<n; j++ ) { |
| kbuff.set(lc[j], corr[j]); |
| kplus.execute2(kbuff, genexec(0, b, scalars, m, n, rix+i, i, j)); |
| lc[j] = kbuff._sum; |
| corr[j] = kbuff._correction; |
| } |
| } |
| else if( a != null ) { |
| for( int i=rl; i<ru; i++ ) { |
| double[] avals = a.values(i); |
| int aix = a.pos(i); |
| for( int j=0; j<n; j++ ) { |
| double aval = avals[aix + j]; |
| if( aval != 0 || !sparseSafe ) { |
| kbuff.set(lc[j], corr[j]); |
| kplus.execute2(kbuff, genexec(aval, b, scalars, m, n, rix+i, i, j)); |
| lc[j] = kbuff._sum; |
| corr[j] = kbuff._correction; |
| } |
| } |
| } |
| } |
| |
| return -1; |
| } |
| |
| private long executeDenseColAggMxx(DenseBlock a, SideInput[] b, double[] scalars, |
| DenseBlock c, int m, int n, boolean sparseSafe, int rl, int ru, long rix) |
| { |
| double[] lc = c.valuesAt(0); //single block |
| |
| double initialVal = (_aggOp==AggOp.MIN) ? Double.POSITIVE_INFINITY : Double.NEGATIVE_INFINITY; |
| ValueFunction vfun = getAggFunction(); |
| Arrays.fill(lc, initialVal); |
| |
| if( a == null && !sparseSafe ) { //empty |
| for( int i=rl; i<ru; i++ ) |
| for( int j=0; j<n; j++ ) |
| lc[j] = vfun.execute(lc[j], genexec(0, b, scalars, m, n, rix+i, i, j)); |
| } |
| else if( a != null ) { //general case |
| int[] counts = new int[n]; |
| for( int i=rl; i<ru; i++ ) { |
| double[] avals = a.values(i); |
| int aix = a.pos(i); |
| for( int j=0; j<n; j++ ) { |
| double aval = avals[aix + j]; |
| if( aval != 0 || !sparseSafe ) { |
| lc[j] = vfun.execute(lc[j], genexec(aval, b, scalars, m, n, rix+i, i, j)); |
| counts[j] ++; |
| } |
| } |
| } |
| if( sparseSafe ) |
| for(int j=0; j<n; j++) |
| if( counts[j] != ru-rl ) |
| lc[j] = vfun.execute(lc[j], 0); |
| } |
| return -1; |
| } |
| |
| private double executeDenseAggSum(DenseBlock a, SideInput[] b, double[] scalars, |
| int m, int n, boolean sparseSafe, int rl, int ru, long rix) |
| { |
| KahanFunction kplus = (KahanFunction) getAggFunction(); |
| KahanObject kbuff = new KahanObject(0, 0); |
| |
| if( a == null && !sparseSafe ) { |
| for( int i=rl; i<ru; i++ ) |
| for( int j=0; j<n; j++ ) |
| kplus.execute2(kbuff, genexec(0, b, scalars, m, n, rix+i, i, j)); |
| } |
| else if( a != null ) { |
| for( int i=rl; i<ru; i++ ) { |
| double[] avals = a.values(i); |
| int aix = a.pos(i); |
| for( int j=0; j<n; j++ ) { |
| double aval = avals[aix + j]; |
| if( aval != 0 || !sparseSafe) |
| kplus.execute2(kbuff, genexec(aval, b, scalars, m, n, rix+i, i, j)); |
| } |
| } |
| } |
| |
| return kbuff._sum; |
| } |
| |
| private double executeDenseAggMxx(DenseBlock a, SideInput[] b, double[] scalars, |
| int m, int n, boolean sparseSafe, int rl, int ru, long rix) |
| { |
| //safe aggregation for min/max w/ handling of zero entries |
| //note: sparse safe with zero value as min/max handled outside |
| double ret = (_aggOp==AggOp.MIN) ? Double.POSITIVE_INFINITY : Double.NEGATIVE_INFINITY; |
| ValueFunction vfun = getAggFunction(); |
| |
| if( a == null && !sparseSafe ) { |
| for( int i=rl; i<ru; i++ ) |
| for( int j=0; j<n; j++ ) |
| ret = vfun.execute(ret, genexec(0, b, scalars, m, n, rix+i, i, j)); |
| } |
| else if( a != null ) { |
| for( int i=rl; i<ru; i++ ) { |
| double[] avals = a.values(i); |
| int aix = a.pos(i); |
| for( int j=0; j<n; j++ ) { |
| double aval = avals[aix + j]; |
| if( aval != 0 || !sparseSafe) |
| ret = vfun.execute(ret, genexec(aval, b, scalars, m, n, rix+i, i, j)); |
| } |
| } |
| } |
| |
| return ret; |
| } |
| |
| private long executeSparseNoAggSparse(SparseBlock sblock, SideInput[] b, double[] scalars, |
| MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru, long rix) |
| { |
| //note: sequential scan algorithm for both sparse-safe and -unsafe |
| //in order to avoid binary search for sparse-unsafe |
| SparseBlock c = out.getSparseBlock(); |
| long lnnz = 0; |
| for(int i=rl; i<ru; i++) { |
| int lastj = -1; |
| //handle non-empty rows |
| if( sblock != null && !sblock.isEmpty(i) ) { |
| int apos = sblock.pos(i); |
| int alen = sblock.size(i); |
| int[] aix = sblock.indexes(i); |
| double[] avals = sblock.values(i); |
| c.allocate(i, sparseSafe ? alen : n); |
| for(int k=apos; k<apos+alen; k++) { |
| //process zeros before current non-zero |
| if( !sparseSafe ) |
| for(int j=lastj+1; j<aix[k]; j++) |
| c.append(i, j, genexec(0, b, scalars, m, n, rix+i, i, j)); |
| //process current non-zero |
| lastj = aix[k]; |
| c.append(i, lastj, genexec(avals[k], b, scalars, m, n, rix+i, i, lastj)); |
| } |
| } |
| //process empty rows or remaining zeros |
| if( !sparseSafe ) |
| for(int j=lastj+1; j<n; j++) |
| c.append(i, j, genexec(0, b, scalars, m, n, rix+i, i, j)); |
| lnnz += c.size(i); |
| } |
| |
| return lnnz; |
| } |
| |
| private long executeSparseNoAggDense(SparseBlock sblock, SideInput[] b, double[] scalars, |
| MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru, long rix) |
| { |
| //note: sequential scan algorithm for both sparse-safe and -unsafe |
| //in order to avoid binary search for sparse-unsafe |
| DenseBlock c = out.getDenseBlock(); |
| long lnnz = 0; |
| for(int i=rl; i<ru; i++) { |
| int lastj = -1; |
| //handle non-empty rows |
| if( sblock != null && !sblock.isEmpty(i) ) { |
| int apos = sblock.pos(i); |
| int alen = sblock.size(i); |
| int[] aix = sblock.indexes(i); |
| double[] avals = sblock.values(i); |
| double[] cvals = c.values(i); |
| int cix = c.pos(i); |
| for(int k=apos; k<apos+alen; k++) { |
| //process zeros before current non-zero |
| if( !sparseSafe ) |
| for(int j=lastj+1; j<aix[k]; j++) |
| lnnz += ((cvals[cix+j]=genexec(0, b, scalars, m, n, rix+i, i, j))!=0)?1:0; |
| //process current non-zero |
| lastj = aix[k]; |
| lnnz += ((cvals[cix+lastj]=genexec(avals[k], b, scalars, m, n, rix+i, i, lastj))!=0)?1:0; |
| } |
| } |
| //process empty rows or remaining zeros |
| if( !sparseSafe ) |
| for(int j=lastj+1; j<n; j++) { |
| double[] cvals = c.values(i); |
| int cix = c.pos(i); |
| lnnz += ((cvals[cix+j]=genexec(0, b, scalars, m, n, rix+i, i, j))!=0)?1:0; |
| } |
| } |
| return lnnz; |
| } |
| |
| private long executeSparseRowAggSum(SparseBlock sblock, SideInput[] b, double[] scalars, |
| MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru, long rix) |
| { |
| KahanFunction kplus = (KahanFunction) getAggFunction(); |
| KahanObject kbuff = new KahanObject(0, 0); |
| |
| //note: sequential scan algorithm for both sparse-safe and -unsafe |
| //in order to avoid binary search for sparse-unsafe |
| double[] c = out.getDenseBlockValues(); |
| long lnnz = 0; |
| for(int i=rl; i<ru; i++) { |
| kbuff.set(0, 0); |
| int lastj = -1; |
| //handle non-empty rows |
| if( sblock != null && !sblock.isEmpty(i) ) { |
| int apos = sblock.pos(i); |
| int alen = sblock.size(i); |
| int[] aix = sblock.indexes(i); |
| double[] avals = sblock.values(i); |
| for(int k=apos; k<apos+alen; k++) { |
| //process zeros before current non-zero |
| if( !sparseSafe ) |
| for(int j=lastj+1; j<aix[k]; j++) |
| kplus.execute2(kbuff, genexec(0, b, scalars, m, n, rix+i, i, j)); |
| //process current non-zero |
| lastj = aix[k]; |
| kplus.execute2(kbuff, genexec(avals[k], b, scalars, m, n, rix+i, i, lastj)); |
| } |
| } |
| //process empty rows or remaining zeros |
| if( !sparseSafe ) |
| for(int j=lastj+1; j<n; j++) |
| kplus.execute2(kbuff, genexec(0, b, scalars, m, n, rix+i, i, j)); |
| lnnz += ((c[i] = kbuff._sum)!=0) ? 1 : 0; |
| } |
| return lnnz; |
| } |
| |
| private long executeSparseRowAggMxx(SparseBlock sblock, SideInput[] b, double[] scalars, |
| MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru, long rix) |
| { |
| double initialVal = (_aggOp==AggOp.MIN) ? Double.POSITIVE_INFINITY : Double.NEGATIVE_INFINITY; |
| ValueFunction vfun = getAggFunction(); |
| |
| //note: sequential scan algorithm for both sparse-safe and -unsafe |
| //in order to avoid binary search for sparse-unsafe |
| double[] c = out.getDenseBlockValues(); |
| long lnnz = 0; |
| for(int i=rl; i<ru; i++) { |
| double tmp = (sparseSafe && sblock.size(i) < n) ? 0 : initialVal; |
| int lastj = -1; |
| //handle non-empty rows |
| if( sblock != null && !sblock.isEmpty(i) ) { |
| int apos = sblock.pos(i); |
| int alen = sblock.size(i); |
| int[] aix = sblock.indexes(i); |
| double[] avals = sblock.values(i); |
| for(int k=apos; k<apos+alen; k++) { |
| //process zeros before current non-zero |
| if( !sparseSafe ) |
| for(int j=lastj+1; j<aix[k]; j++) |
| tmp = vfun.execute(tmp, genexec(0, b, scalars, m, n, rix+i, i, j)); |
| //process current non-zero |
| lastj = aix[k]; |
| tmp = vfun.execute( tmp, genexec(avals[k], b, scalars, m, n, rix+i, i, lastj)); |
| } |
| } |
| //process empty rows or remaining zeros |
| if( !sparseSafe ) |
| for(int j=lastj+1; j<n; j++) |
| tmp = vfun.execute(tmp, genexec(0, b, scalars, m, n, rix+i, i, j)); |
| lnnz += ((c[i] = tmp)!=0) ? 1 : 0; |
| } |
| return lnnz; |
| } |
| |
| private long executeSparseColAggSum(SparseBlock sblock, SideInput[] b, double[] scalars, |
| MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru, long rix) |
| { |
| KahanFunction kplus = (KahanFunction) getAggFunction(); |
| KahanObject kbuff = new KahanObject(0, 0); |
| double[] corr = new double[n]; |
| |
| //note: sequential scan algorithm for both sparse-safe and -unsafe |
| //in order to avoid binary search for sparse-unsafe |
| double[] c = out.getDenseBlockValues(); |
| for(int i=rl; i<ru; i++) { |
| kbuff.set(0, 0); |
| int lastj = -1; |
| //handle non-empty rows |
| if( sblock != null && !sblock.isEmpty(i) ) { |
| int apos = sblock.pos(i); |
| int alen = sblock.size(i); |
| int[] aix = sblock.indexes(i); |
| double[] avals = sblock.values(i); |
| for(int k=apos; k<apos+alen; k++) { |
| //process zeros before current non-zero |
| if( !sparseSafe ) |
| for(int j=lastj+1; j<aix[k]; j++) { |
| kbuff.set(c[j], corr[j]); |
| kplus.execute2(kbuff, genexec(0, b, scalars, m, n, rix+i, i, j)); |
| c[j] = kbuff._sum; |
| corr[j] = kbuff._correction; |
| } |
| //process current non-zero |
| lastj = aix[k]; |
| kbuff.set(c[aix[k]], corr[aix[k]]); |
| kplus.execute2(kbuff, genexec(avals[k], b, scalars, m, n, rix+i, i, lastj)); |
| c[aix[k]] = kbuff._sum; |
| corr[aix[k]] = kbuff._correction; |
| } |
| } |
| //process empty rows or remaining zeros |
| if( !sparseSafe ) |
| for(int j=lastj+1; j<n; j++) { |
| kbuff.set(c[j], corr[j]); |
| kplus.execute2(kbuff, genexec(0, b, scalars, m, n, rix+i, i, j)); |
| c[j] = kbuff._sum; |
| corr[j] = kbuff._correction; |
| } |
| } |
| return -1; |
| } |
| |
| private long executeSparseColAggMxx(SparseBlock sblock, SideInput[] b, double[] scalars, |
| MatrixBlock out, int m, int n, boolean sparseSafe, int rl, int ru, long rix) |
| { |
| double initialVal = (_aggOp==AggOp.MIN) ? Double.POSITIVE_INFINITY : Double.NEGATIVE_INFINITY; |
| ValueFunction vfun = getAggFunction(); |
| double[] c = out.getDenseBlockValues(); |
| Arrays.fill(c, initialVal); |
| int[] count = new int[n]; |
| |
| //note: sequential scan algorithm for both sparse-safe and -unsafe |
| //in order to avoid binary search for sparse-unsafe |
| for(int i=rl; i<ru; i++) { |
| int lastj = -1; |
| //handle non-empty rows |
| if( sblock != null && !sblock.isEmpty(i) ) { |
| int apos = sblock.pos(i); |
| int alen = sblock.size(i); |
| int[] aix = sblock.indexes(i); |
| double[] avals = sblock.values(i); |
| for(int k=apos; k<apos+alen; k++) { |
| //process zeros before current non-zero |
| if( !sparseSafe ) |
| for(int j=lastj+1; j<aix[k]; j++) { |
| c[j] = vfun.execute(c[j], genexec(0, b, scalars, m, n, rix+i, i, j)); |
| count[j] ++; |
| } |
| //process current non-zero |
| lastj = aix[k]; |
| c[aix[k]] = vfun.execute(c[aix[k]], genexec(avals[k], b, scalars, m, n, rix+i, i, lastj)); |
| count[aix[k]] ++; |
| } |
| } |
| //process empty rows or remaining zeros |
| if( !sparseSafe ) |
| for(int j=lastj+1; j<n; j++) |
| c[j] = vfun.execute(c[j], genexec(0, b, scalars, m, n, rix+i, i, j)); |
| } |
| |
| return -1; |
| } |
| |
| private double executeSparseAggSum(SparseBlock sblock, SideInput[] b, double[] scalars, |
| int m, int n, boolean sparseSafe, int rl, int ru, long rix) |
| { |
| KahanFunction kplus = (KahanFunction) getAggFunction(); |
| KahanObject kbuff = new KahanObject(0, 0); |
| |
| //note: sequential scan algorithm for both sparse-safe and -unsafe |
| //in order to avoid binary search for sparse-unsafe |
| for(int i=rl; i<ru; i++) { |
| int lastj = -1; |
| //handle non-empty rows |
| if( sblock != null && !sblock.isEmpty(i) ) { |
| int apos = sblock.pos(i); |
| int alen = sblock.size(i); |
| int[] aix = sblock.indexes(i); |
| double[] avals = sblock.values(i); |
| for(int k=apos; k<apos+alen; k++) { |
| //process zeros before current non-zero |
| if( !sparseSafe ) |
| for(int j=lastj+1; j<aix[k]; j++) |
| kplus.execute2(kbuff, genexec(0, b, scalars, m, n, rix+i, i, j)); |
| //process current non-zero |
| lastj = aix[k]; |
| kplus.execute2(kbuff, genexec(avals[k], b, scalars, m, n, rix+i, i, lastj)); |
| } |
| } |
| //process empty rows or remaining zeros |
| if( !sparseSafe ) |
| for(int j=lastj+1; j<n; j++) |
| kplus.execute2(kbuff, genexec(0, b, scalars, m, n, rix+i, i, j)); |
| } |
| return kbuff._sum; |
| } |
| |
| private double executeSparseAggMxx(SparseBlock sblock, SideInput[] b, double[] scalars, |
| int m, int n, boolean sparseSafe, int rl, int ru, long rix) |
| { |
| double ret = (_aggOp==AggOp.MIN) ? Double.POSITIVE_INFINITY : Double.NEGATIVE_INFINITY; |
| ret = (sparseSafe && sblock.size() < (long)m*n) ? 0 : ret; |
| ValueFunction vfun = getAggFunction(); |
| |
| //note: sequential scan algorithm for both sparse-safe and -unsafe |
| //in order to avoid binary search for sparse-unsafe |
| for(int i=rl; i<ru; i++) { |
| int lastj = -1; |
| //handle non-empty rows |
| if( sblock != null && !sblock.isEmpty(i) ) { |
| int apos = sblock.pos(i); |
| int alen = sblock.size(i); |
| int[] aix = sblock.indexes(i); |
| double[] avals = sblock.values(i); |
| for(int k=apos; k<apos+alen; k++) { |
| //process zeros before current non-zero |
| if( !sparseSafe ) |
| for(int j=lastj+1; j<aix[k]; j++) |
| ret = vfun.execute(ret, genexec(0, b, scalars, m, n, rix+i, i, j)); |
| //process current non-zero |
| lastj = aix[k]; |
| ret = vfun.execute(ret, genexec(avals[k], b, scalars, m, n, rix+i, i, lastj)); |
| } |
| } |
| //process empty rows or remaining zeros |
| if( !sparseSafe ) |
| for(int j=lastj+1; j<n; j++) |
| ret = vfun.execute(ret, genexec(0, b, scalars, m, n, rix+i, i, j)); |
| } |
| return ret; |
| } |
| |
| //local execution where grix==rix |
| protected final double genexec( double a, SideInput[] b, |
| double[] scalars, int m, int n, int rix, int cix) { |
| return genexec(a, b, scalars, m, n, rix, rix, cix); |
| } |
| |
| //distributed execution with additional global row index |
| protected abstract double genexec( double a, SideInput[] b, |
| double[] scalars, int m, int n, long gix, int rix, int cix); |
| |
| private class ParAggTask implements Callable<Double> |
| { |
| private final MatrixBlock _a; |
| private final SideInput[] _b; |
| private final double[] _scalars; |
| private final int _rlen; |
| private final int _clen; |
| private final boolean _safe; |
| private final int _rl; |
| private final int _ru; |
| |
| protected ParAggTask( MatrixBlock a, SideInput[] b, double[] scalars, |
| int rlen, int clen, boolean sparseSafe, int rl, int ru ) { |
| _a = a; |
| _b = b; |
| _scalars = scalars; |
| _rlen = rlen; |
| _clen = clen; |
| _safe = sparseSafe; |
| _rl = rl; |
| _ru = ru; |
| } |
| |
| @Override |
| public Double call() { |
| if (!_a.isInSparseFormat()) |
| return executeDenseAndAgg(_a.getDenseBlock(), _b, _scalars, _rlen, _clen, _safe, _rl, _ru, 0); |
| else |
| return executeSparseAndAgg(_a.getSparseBlock(), _b, _scalars, _rlen, _clen, _safe, _rl, _ru, 0); |
| } |
| } |
| |
| private class ParExecTask implements Callable<Long> |
| { |
| private final MatrixBlock _a; |
| private final SideInput[] _b; |
| private final double[] _scalars; |
| private MatrixBlock _c; |
| private final int _rlen; |
| private final int _clen; |
| private final boolean _safe; |
| private final int _rl; |
| private final int _ru; |
| |
| protected ParExecTask( MatrixBlock a, SideInput[] b, double[] scalars, MatrixBlock c, |
| int rlen, int clen, boolean sparseSafe, int rl, int ru ) { |
| _a = a; |
| _b = b; |
| _scalars = scalars; |
| _c = c; |
| _rlen = rlen; |
| _clen = clen; |
| _safe = sparseSafe; |
| _rl = rl; |
| _ru = ru; |
| } |
| |
| @Override |
| public Long call() { |
| if( _type==CellType.COL_AGG ) { |
| _c = new MatrixBlock(1,_clen, false); |
| _c.allocateDenseBlock(); |
| } |
| if( !_a.isInSparseFormat() ) |
| return executeDense(_a.getDenseBlock(), _b, _scalars, _c, _rlen, _clen, _safe, _rl, _ru, 0); |
| else |
| return executeSparse(_a.getSparseBlock(), _b, _scalars, _c, _rlen, _clen, _safe, _rl, _ru, 0); |
| } |
| |
| public MatrixBlock getResult() { |
| return _c; |
| } |
| } |
| } |