blob: 437cf975346c7434df54b8534e422242f4108d70 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.sysds.runtime.compress.lib;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
import org.apache.sysds.runtime.compress.CompressionSettings;
import org.apache.sysds.runtime.compress.colgroup.ColGroup;
import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed;
import org.apache.sysds.runtime.functionobjects.Builtin;
import org.apache.sysds.runtime.functionobjects.Builtin.BuiltinCode;
import org.apache.sysds.runtime.functionobjects.IndexFunction;
import org.apache.sysds.runtime.functionobjects.KahanFunction;
import org.apache.sysds.runtime.functionobjects.KahanPlus;
import org.apache.sysds.runtime.functionobjects.KahanPlusSq;
import org.apache.sysds.runtime.functionobjects.Mean;
import org.apache.sysds.runtime.functionobjects.Plus;
import org.apache.sysds.runtime.functionobjects.ReduceAll;
import org.apache.sysds.runtime.functionobjects.ReduceCol;
import org.apache.sysds.runtime.functionobjects.ReduceRow;
import org.apache.sysds.runtime.instructions.cp.KahanObject;
import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
import org.apache.sysds.runtime.util.CommonThreadPool;
public class LibCompAgg {
// private static final Log LOG = LogFactory.getLog(LibCompAgg.class.getName());
// private static final long MIN_PAR_AGG_THRESHOLD = 8 * 1024 * 1024;
private static final long MIN_PAR_AGG_THRESHOLD = 8;
private static ThreadLocal<MatrixBlock> memPool = new ThreadLocal<MatrixBlock>() {
protected MatrixBlock initialValue() {
return null;
public static MatrixBlock aggregateUnary(CompressedMatrixBlock inputMatrix, MatrixBlock outputMatrix,
AggregateUnaryOperator op, int blen, MatrixIndexes indexesIn, boolean inCP) {
if(inputMatrix.getColGroups() != null) {
fillStart(outputMatrix, op);
if(inputMatrix.isOverlapping() &&
(op.aggOp.increOp.fn instanceof KahanPlusSq || (op.aggOp.increOp.fn instanceof Builtin &&
(((Builtin) op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MIN ||
((Builtin) op.aggOp.increOp.fn).getBuiltinCode() == BuiltinCode.MAX))))
aggregateUnaryOverlapping(inputMatrix, outputMatrix, op, indexesIn, inCP);
aggregateUnaryNormalCompressedMatrixBlock(inputMatrix, outputMatrix, op, blen, indexesIn, inCP);
if(op.aggOp.existsCorrection() && inCP)
return outputMatrix;
private static void aggregateUnaryNormalCompressedMatrixBlock(CompressedMatrixBlock inputMatrix,
MatrixBlock outputMatrix, AggregateUnaryOperator op, int blen, MatrixIndexes indexesIn, boolean inCP) {
aggregateUncompressedColGroups(inputMatrix, outputMatrix, op);
if(isValidForParallelProcessing(inputMatrix, op))
tryToAggregateInParallel(inputMatrix, outputMatrix, op);
aggregateSingleThreaded(inputMatrix, outputMatrix, op);
postProcessAggregate(inputMatrix, outputMatrix, op);
private static boolean isValidForParallelProcessing(CompressedMatrixBlock m1, AggregateUnaryOperator op) {
return op.getNumThreads() > 1 && m1.getExactSizeOnDisk() > MIN_PAR_AGG_THRESHOLD;
private static void aggregateUncompressedColGroups(CompressedMatrixBlock m1, MatrixBlock ret,
AggregateUnaryOperator op) {
ColGroupUncompressed uc = m1.getUncompressedColGroup();
if(uc != null)
uc.unaryAggregateOperations(op, ret);
private static void tryToAggregateInParallel(CompressedMatrixBlock m1, MatrixBlock ret, AggregateUnaryOperator op) {
int k = op.getNumThreads();
if(k == 1)
aggregateUnaryOperations(op, m1.getColGroups(), ret, 0, m1.getNumRows(), m1.getNumColumns());
aggregateInParallel(m1, ret, op, k);
private static void aggregateInParallel(CompressedMatrixBlock m1, MatrixBlock ret, AggregateUnaryOperator op,
int k) {
ExecutorService pool = CommonThreadPool.get(op.getNumThreads());
ArrayList<UnaryAggregateTask> tasks = new ArrayList<>();
try {
// compute all compressed column groups
if(op.indexFn instanceof ReduceCol) {
final int blkz = CompressionSettings.BITMAP_BLOCK_SZ;
// int blklen = Math.max((int) Math.ceil((double) m1.getNumRows() / (op.getNumThreads() * 2)), blkz);
// blklen += (blklen % blkz != 0) ? blkz - blklen % blkz : 0;
int blklen = blkz * 4;
for(int i = 0; i * blklen < m1.getNumRows(); i++)
tasks.add(new UnaryAggregateTask(m1.getColGroups(), ret, i * blklen,
Math.min((i + 1) * blklen, m1.getNumRows()), op, m1.getNumColumns()));
else {
List<List<ColGroup>> grpParts = createTaskPartitionNotIncludingUncompressable(m1.getColGroups(), k);
for(List<ColGroup> grp : grpParts)
tasks.add(new UnaryAggregateTask(grp, ret, 0, m1.getNumRows(), op, m1.getNumColumns(),
List<Future<MatrixBlock>> futures = pool.invokeAll(tasks);
// aggregate partial results
if(op.indexFn instanceof ReduceAll)
if(op.aggOp.increOp.fn instanceof Builtin)
aggregateResults(ret, futures, op);
sumResults(ret, futures);
else if(op.indexFn instanceof ReduceRow && m1.isOverlapping()) {
if(op.aggOp.increOp.fn instanceof Builtin)
aggregateResultVectors(ret, futures, op);
sumResultVectors(ret, futures);
for(Future<MatrixBlock> f : futures)
catch(InterruptedException | ExecutionException e) {
throw new DMLRuntimeException(e);
private static void sumResults(MatrixBlock ret, List<Future<MatrixBlock>> futures)
throws InterruptedException, ExecutionException {
double val = ret.quickGetValue(0, 0);
for(Future<MatrixBlock> rtask : futures) {
double tmp = rtask.get().quickGetValue(0, 0);
val = val + tmp;
ret.quickSetValue(0, 0, val);
private static void sumResultVectors(MatrixBlock ret, List<Future<MatrixBlock>> futures)
throws InterruptedException, ExecutionException {
double[] retVals = ret.getDenseBlockValues();
for(Future<MatrixBlock> rtask : futures) {
double[] taskResult = rtask.get().getDenseBlockValues();
for(int i = 0; i < retVals.length; i++) {
retVals[i] += taskResult[i];
private static void aggregateResults(MatrixBlock ret, List<Future<MatrixBlock>> futures, AggregateUnaryOperator op)
throws InterruptedException, ExecutionException {
double val = ret.quickGetValue(0, 0);
for(Future<MatrixBlock> rtask : futures) {
double tmp = rtask.get().quickGetValue(0, 0);
val = op.aggOp.increOp.fn.execute(val, tmp);
ret.quickSetValue(0, 0, val);
private static void aggregateResultVectors(MatrixBlock ret, List<Future<MatrixBlock>> futures,
AggregateUnaryOperator op) throws InterruptedException, ExecutionException {
double[] retVals = ret.getDenseBlockValues();
for(Future<MatrixBlock> rtask : futures) {
double[] taskResult = rtask.get().getDenseBlockValues();
for(int i = 0; i < retVals.length; i++) {
retVals[i] = op.aggOp.increOp.fn.execute(retVals[i] , taskResult[i]);
private static void aggregateSingleThreaded(CompressedMatrixBlock m1, MatrixBlock ret, AggregateUnaryOperator op) {
aggregateUnaryOperations(op, m1.getColGroups(), ret, 0, m1.getNumRows(), m1.getNumColumns());
private static void divideByNumberOfCellsForMean(CompressedMatrixBlock m1, MatrixBlock ret, IndexFunction idxFn) {
if(idxFn instanceof ReduceAll)
divideByNumberOfCellsForMeanAll(m1, ret);
else if(idxFn instanceof ReduceCol)
divideByNumberOfCellsForMeanRows(m1, ret);
else if(idxFn instanceof ReduceRow)
divideByNumberOfCellsForMeanCols(m1, ret);
private static void divideByNumberOfCellsForMeanRows(CompressedMatrixBlock m1, MatrixBlock ret) {
for(int i = 0; i < m1.getNumRows(); i++) {
ret.quickSetValue(i, 0, ret.quickGetValue(i, 0) / m1.getNumColumns());
private static void divideByNumberOfCellsForMeanCols(CompressedMatrixBlock m1, MatrixBlock ret) {
for(int i = 0; i < m1.getNumColumns(); i++) {
ret.quickSetValue(0, i, ret.quickGetValue(0, i) / m1.getNumRows());
private static void divideByNumberOfCellsForMeanAll(CompressedMatrixBlock m1, MatrixBlock ret) {
ret.quickSetValue(0, 0, ret.quickGetValue(0, 0) / (m1.getNumColumns() * m1.getNumRows()));
private static void postProcessAggregate(CompressedMatrixBlock m1, MatrixBlock ret, AggregateUnaryOperator op) {
if(op.aggOp.increOp.fn instanceof Mean)
divideByNumberOfCellsForMean(m1, ret, op.indexFn);
private static void aggregateUnaryOverlapping(CompressedMatrixBlock m1, MatrixBlock ret, AggregateUnaryOperator op,
MatrixIndexes indexesIn, boolean inCP) {
try {
List<Future<MatrixBlock>> rtasks = generateUnaryAggregateOverlappingFutures(m1, ret, op);
reduceOverlappingFutures(rtasks, ret, op);
catch(InterruptedException | ExecutionException e) {
throw new DMLRuntimeException(e);
private static void reduceOverlappingFutures(List<Future<MatrixBlock>> rtasks, MatrixBlock ret,
AggregateUnaryOperator op) throws InterruptedException, ExecutionException {
if(isReduceAll(ret, op.indexFn))
reduceAllOverlappingFutures(rtasks, ret, op);
else if(op.indexFn instanceof ReduceRow)
reduceColOverlappingFutures(rtasks, ret, op);
reduceRowOverlappingFutures(rtasks, ret, op);
private static void reduceColOverlappingFutures(List<Future<MatrixBlock>> rtasks, MatrixBlock ret,
AggregateUnaryOperator op) throws InterruptedException, ExecutionException {
for(Future<MatrixBlock> rtask : rtasks) {
(op.aggOp.increOp.fn instanceof KahanFunction) ? new BinaryOperator(
Plus.getPlusFnObject()) : op.aggOp.increOp);
private static void reduceRowOverlappingFutures(List<Future<MatrixBlock>> rtasks, MatrixBlock ret,
AggregateUnaryOperator op) throws InterruptedException, ExecutionException {
for(Future<MatrixBlock> rtask : rtasks) {
private static boolean isReduceAll(MatrixBlock ret, IndexFunction idxFn) {
return idxFn instanceof ReduceAll || (ret.getNumColumns() == 1 && ret.getNumRows() == 1);
private static void reduceAllOverlappingFutures(List<Future<MatrixBlock>> rtasks, MatrixBlock ret,
AggregateUnaryOperator op) throws InterruptedException, ExecutionException {
if(op.aggOp.increOp.fn instanceof KahanFunction) {
KahanObject kbuff = new KahanObject(ret.quickGetValue(0, 0), 0);
KahanPlus kplus = KahanPlus.getKahanPlusFnObject();
for(Future<MatrixBlock> rtask : rtasks) {
double tmp = rtask.get().quickGetValue(0, 0);
kplus.execute2(kbuff, tmp);
ret.quickSetValue(0, 0, kbuff._sum);
else {
double val = ret.quickGetValue(0, 0);
for(Future<MatrixBlock> rtask : rtasks) {
double tmp = rtask.get().quickGetValue(0, 0);
val = op.aggOp.increOp.fn.execute(val, tmp);
ret.quickSetValue(0, 0, val);
private static List<Future<MatrixBlock>> generateUnaryAggregateOverlappingFutures(CompressedMatrixBlock m1,
MatrixBlock ret, AggregateUnaryOperator op) throws InterruptedException {
ExecutorService pool = CommonThreadPool.get(op.getNumThreads());
ArrayList<UnaryAggregateOverlappingTask> tasks = new ArrayList<>();
final int blklen = Math.min(m1.getNumRows() / op.getNumThreads(), CompressionSettings.BITMAP_BLOCK_SZ);
for(int i = 0; i * blklen < m1.getNumRows(); i++)
tasks.add(new UnaryAggregateOverlappingTask(m1, ret, i * blklen,
Math.min((i + 1) * blklen, m1.getNumRows()), op));
List<Future<MatrixBlock>> futures = pool.invokeAll(tasks);
return futures;
private static List<List<ColGroup>> createTaskPartitionNotIncludingUncompressable(List<ColGroup> colGroups, int k) {
int numTasks = Math.min(k, colGroups.size());
List<List<ColGroup>> grpParts = new ArrayList<>();
for(int i = 0; i < numTasks; i++) {
grpParts.add(new ArrayList<>());
int pos = 0;
for(ColGroup grp : colGroups) {
if(!(grp instanceof ColGroupUncompressed)) {
List<ColGroup> g = grpParts.get(pos);
pos = (pos + 1) % numTasks;
return grpParts;
private static void aggregateUnaryOperations(AggregateUnaryOperator op, List<ColGroup> groups, MatrixBlock ret,
int rl, int ru, int numColumns) {
if(op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn instanceof Builtin)
aggregateUnaryBuiltinRowOperation(op, groups, ret, rl, ru, numColumns);
aggregateUnaryNormalOperation(op, groups, ret, rl, ru, numColumns);
private static void aggregateUnaryNormalOperation(AggregateUnaryOperator op, List<ColGroup> groups, MatrixBlock ret,
int rl, int ru, int numColumns) {
for(ColGroup grp : groups)
grp.unaryAggregateOperations(op, ret, rl, ru);
private static void aggregateUnaryBuiltinRowOperation(AggregateUnaryOperator op, List<ColGroup> groups,
MatrixBlock ret, int rl, int ru, int numColumns) {
int[] rnnz = null;
int numberDenseColumns = 0;
for(ColGroup grp : groups) {
grp.unaryAggregateOperations(op, ret, rl, ru);
numberDenseColumns += grp.getNumCols();
if (rnnz == null)
rnnz = new int[ru - rl];
grp.countNonZerosPerRow(rnnz, rl, ru);
if(rnnz != null)
for(int row = rl; row < ru; row++)
if(rnnz[row-rl] + numberDenseColumns < numColumns)
ret.quickSetValue(row, 0, op.aggOp.increOp.fn.execute(ret.quickGetValue(row, 0), 0.0));
private static void fillStart(MatrixBlock ret, AggregateUnaryOperator op) {
if(op.aggOp.increOp.fn instanceof Builtin) {
Double val = null;
switch(((Builtin) op.aggOp.increOp.fn).getBuiltinCode()) {
case MAX:
case MIN:
if(val != null) {
private static class UnaryAggregateTask implements Callable<MatrixBlock> {
private final List<ColGroup> _groups;
private final int _rl;
private final int _ru;
private final MatrixBlock _ret;
private final int _numColumns;
private final AggregateUnaryOperator _op;
protected UnaryAggregateTask(List<ColGroup> groups, MatrixBlock ret, int rl, int ru, AggregateUnaryOperator op,
int numColumns) {
_groups = groups;
_op = op;
_rl = rl;
_ru = ru;
_numColumns = numColumns;
if(_op.indexFn instanceof ReduceAll) { // sum
_ret = new MatrixBlock(1, 1, false);
if(_op.aggOp.increOp.fn instanceof Builtin)
ret.getNumRows() * ret.getNumColumns());
else // colSums / rowSums
_ret = ret;
protected UnaryAggregateTask(List<ColGroup> groups, MatrixBlock ret, int rl, int ru, AggregateUnaryOperator op,
int numColumns, boolean overlapping) {
_groups = groups;
_op = op;
_rl = rl;
_ru = ru;
_numColumns = numColumns;
if(_op.indexFn instanceof ReduceAll || (_op.indexFn instanceof ReduceRow && overlapping)) {
_ret = new MatrixBlock(ret.getNumRows(), ret.getNumColumns(), false);
if(_op.aggOp.increOp.fn instanceof Builtin)
ret.getNumRows() * ret.getNumColumns());
else // colSums / rowSums
_ret = ret;
public MatrixBlock call() {
aggregateUnaryOperations(_op, _groups, _ret, _rl, _ru, _numColumns);
return _ret;
private static class UnaryAggregateOverlappingTask implements Callable<MatrixBlock> {
private final CompressedMatrixBlock _m1;
private final int _rl;
private final int _ru;
private final MatrixBlock _ret;
private final AggregateUnaryOperator _op;
protected UnaryAggregateOverlappingTask(CompressedMatrixBlock m1, MatrixBlock ret, int rl, int ru,
AggregateUnaryOperator op) {
_m1 = m1;
_op = op;
_rl = rl;
_ru = ru;
_ret = ret;
private MatrixBlock setupOutputMatrix() {
MatrixBlock outputBlock;
if(_op.indexFn instanceof ReduceAll)
outputBlock = new MatrixBlock(_ret.getNumRows(), _ret.getNumColumns(), false).allocateDenseBlock();
else if(_op.indexFn instanceof ReduceCol)
outputBlock = new MatrixBlock(_ru - _rl, _ret.getNumColumns(), false).allocateDenseBlock();
outputBlock = new MatrixBlock(_ret.getNumRows(), _ret.getNumColumns(), false).allocateDenseBlock();
if(_op.aggOp.increOp.fn instanceof Builtin)
if(_op.indexFn instanceof ReduceCol)
_rl * _ret.getNumColumns(),
return outputBlock;
private MatrixBlock getTmp() {
MatrixBlock tmp = memPool.get();
if(tmp == null) {
memPool.set(new MatrixBlock(_ru - _rl, _m1.getNumColumns(), false, -1).allocateBlock());
tmp = memPool.get();
else {
tmp = memPool.get();
tmp.reset(_ru - _rl, _m1.getNumColumns(), false, -1);
return tmp;
private MatrixBlock decompressToTemp() {
MatrixBlock tmp = getTmp();
for(ColGroup g : _m1.getColGroups())
g.decompressToBlockSafe(tmp, _rl, _ru, 0, g.getValues(), false);
tmp.setNonZeros(_rl + _ru);
return tmp;
public MatrixBlock call() {
MatrixBlock tmp = decompressToTemp();
MatrixBlock outputBlock = setupOutputMatrix();
LibMatrixAgg.aggregateUnaryMatrix(tmp, outputBlock, _op);
if(_op.indexFn instanceof ReduceCol) {
double[] retValues = _ret.getDenseBlockValues();
int currentIndex = _rl * _ret.getNumColumns();
double[] outputBlockValues = outputBlock.getDenseBlockValues();
System.arraycopy(outputBlockValues, 0, retValues, currentIndex, outputBlockValues.length);
return null;
else {
return outputBlock;