[SYSTEMDS-2613-2614] Sparse & dense compressed MM
This commit adds the sparse left and dense right matrix multiplication
for CLA and LCLA.
- Fix OLE and don't use skip indexes
- Static Cost Partitioner
diff --git a/src/main/java/org/apache/sysds/runtime/compress/BitmapEncoder.java b/src/main/java/org/apache/sysds/runtime/compress/BitmapEncoder.java
index 6683d64..2420f0d 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/BitmapEncoder.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/BitmapEncoder.java
@@ -59,8 +59,7 @@
* @param compSettings The compression settings used for the compression.
* @return uncompressed bitmap representation of the columns
*/
- public static ABitmap extractBitmap(int[] colIndices, MatrixBlock rawBlock,
- CompressionSettings compSettings) {
+ public static ABitmap extractBitmap(int[] colIndices, MatrixBlock rawBlock, CompressionSettings compSettings) {
// note: no sparse column selection reader because low potential
// single column selection
Bitmap res = null;
@@ -197,6 +196,14 @@
System.arraycopy(val.key.getData(), 0, values, bitmapIx * numCols, numCols);
offsetsLists[bitmapIx++] = val.value;
}
+
+ // HACK; we make sure that the first sparse unsafe operation assume
+ // that we have entries with zero values. This makes the first sparse
+ // unsafe operation slightly slower, if the input compressed matrix is
+ // fully dense, aka containing no zero values.
+ // This is required for multi-column colGroups.
+ numZeros = (numColumns > 1) ? numZeros + 1 : numZeros;
+
return new Bitmap(numCols, offsetsLists, numZeros, values);
}
@@ -218,6 +225,7 @@
values[bitmapIx] = val.key;
offsetsLists[bitmapIx++] = val.value;
}
+
return new Bitmap(1, offsetsLists, numZeros, values);
}
@@ -281,12 +289,14 @@
IntArrayList[] fullSizeOffsetsLists = ubm.getOffsetList();
int numZeroGroups = ubm.getZeroCounts();
+ boolean somethingToMerge = false;
for(int idx = 0; idx < scaledValues.length; idx++) {
if(scaledValues[idx] != 0) { // Throw away zero values.
if(values.containsKey(scaledValues[idx])) {
values.get(scaledValues[idx]).add(fullSizeOffsetsLists[idx]);
lengths.put(scaledValues[idx], lengths.get(scaledValues[idx]) + fullSizeOffsetsLists[idx].size());
+ somethingToMerge = true;
}
else {
Queue<IntArrayList> offsets = new LinkedList<>();
@@ -299,23 +309,29 @@
numZeroGroups++;
}
}
- byte[] scaledValuesReduced = new byte[values.keySet().size()];
- IntArrayList[] newOffsetsLists = new IntArrayList[values.keySet().size()];
- Iterator<Entry<Byte, Queue<IntArrayList>>> x = values.entrySet().iterator();
- int idx = 0;
- while(x.hasNext()) {
- Entry<Byte, Queue<IntArrayList>> ent = x.next();
- scaledValuesReduced[idx] = ent.getKey().byteValue();
- Queue<IntArrayList> q = ent.getValue();
- if(q.size() == 1) {
- newOffsetsLists[idx] = q.remove();
+
+ if(somethingToMerge) {
+ byte[] scaledValuesReduced = new byte[values.keySet().size()];
+ IntArrayList[] newOffsetsLists = new IntArrayList[values.keySet().size()];
+ Iterator<Entry<Byte, Queue<IntArrayList>>> x = values.entrySet().iterator();
+ int idx = 0;
+ while(x.hasNext()) {
+ Entry<Byte, Queue<IntArrayList>> ent = x.next();
+ scaledValuesReduced[idx] = ent.getKey().byteValue();
+ Queue<IntArrayList> q = ent.getValue();
+ if(q.size() == 1) {
+ newOffsetsLists[idx] = q.remove();
+ }
+ else {
+ newOffsetsLists[idx] = mergeOffsets(q, new int[lengths.get(ent.getKey())]);
+ }
+ idx++;
}
- else {
- newOffsetsLists[idx] = mergeOffsets(q, new int[lengths.get(ent.getKey())]);
- }
- idx++;
+ return new BitmapLossy(ubm.getNumColumns(), newOffsetsLists, numZeroGroups, scaledValuesReduced, scale);
}
- return new BitmapLossy(ubm.getNumColumns(), newOffsetsLists, numZeroGroups, scaledValuesReduced, scale);
+ else {
+ return new BitmapLossy(ubm.getNumColumns(), fullSizeOffsetsLists, numZeroGroups, scaledValues, scale);
+ }
}
/**
@@ -333,6 +349,7 @@
IntArrayList[] fullSizeOffsetsLists = ubm.getOffsetList();
int numZeroGroups = ubm.getZeroCounts();
boolean allZero = true;
+ boolean somethingToMerge = false;
for(int idx = 0; idx < scaledValues.length; idx += numColumns) {
List<Byte> array = new ArrayList<>();
for(int off = 0; off < numColumns; off++) {
@@ -342,37 +359,57 @@
numZeroGroups += allZero ? 1 : 0;
if(!allZero) {
+ IntArrayList entry = fullSizeOffsetsLists[idx / numColumns];
if(values.containsKey(array)) {
- values.get(array).add(fullSizeOffsetsLists[idx / numColumns]);
- lengths.put(array, lengths.get(array) + fullSizeOffsetsLists[idx / numColumns].size());
+ values.get(array).add(entry);
+ lengths.put(array, lengths.get(array) + entry.size());
+ somethingToMerge = true;
}
else {
Queue<IntArrayList> offsets = new LinkedList<>();
- offsets.add(fullSizeOffsetsLists[idx / numColumns]);
+ offsets.add(entry);
values.put(array, offsets);
- lengths.put(array, fullSizeOffsetsLists[idx / numColumns].size());
+ lengths.put(array, entry.size());
}
}
allZero = true;
}
- byte[] scaledValuesReduced = new byte[values.keySet().size() * numColumns];
- IntArrayList[] newOffsetsLists = new IntArrayList[values.keySet().size()];
- Iterator<Entry<List<Byte>, Queue<IntArrayList>>> x = values.entrySet().iterator();
- int idx = 0;
- while(x.hasNext()) {
- Entry<List<Byte>, Queue<IntArrayList>> ent = x.next();
- List<Byte> key = ent.getKey();
- int row = idx * numColumns;
- for(int off = 0; off < numColumns; off++) {
- scaledValuesReduced[row + off] = key.get(off);
- }
- Queue<IntArrayList> q = ent.getValue();
- newOffsetsLists[idx] = mergeOffsets(q, new int[lengths.get(key)]);
- idx++;
- }
+ // HACK; we make sure that the first sparse unsafe operation assume
+ // that we have entries with zero values. This makes the first sparse
+ // unsafe operation slightly slower, if the input compressed matrix is
+ // fully dense, aka containing no zero values.
+ // This is required for multi-column colGroups.
+ numZeroGroups = numZeroGroups + 1;
- return new BitmapLossy(ubm.getNumColumns(), newOffsetsLists, numZeroGroups, scaledValuesReduced, scale);
+ if(somethingToMerge) {
+
+ byte[] scaledValuesReduced = new byte[values.keySet().size() * numColumns];
+ IntArrayList[] newOffsetsLists = new IntArrayList[values.keySet().size()];
+ Iterator<Entry<List<Byte>, Queue<IntArrayList>>> x = values.entrySet().iterator();
+ int idx = 0;
+ while(x.hasNext()) {
+ Entry<List<Byte>, Queue<IntArrayList>> ent = x.next();
+ List<Byte> key = ent.getKey();
+ int row = idx * numColumns;
+ for(int off = 0; off < numColumns; off++) {
+ scaledValuesReduced[row + off] = key.get(off);
+ }
+ Queue<IntArrayList> q = ent.getValue();
+ if(q.size() == 1) {
+ newOffsetsLists[idx] = q.remove();
+ }
+ else {
+ newOffsetsLists[idx] = mergeOffsets(q, new int[lengths.get(key)]);
+ }
+ idx++;
+ }
+
+ return new BitmapLossy(ubm.getNumColumns(), newOffsetsLists, numZeroGroups, scaledValuesReduced, scale);
+ }
+ else {
+ return new BitmapLossy(ubm.getNumColumns(), fullSizeOffsetsLists, numZeroGroups, scaledValues, scale);
+ }
}
/**
@@ -411,7 +448,6 @@
return res;
}
-
/**
* Statistics class to analyse what compression plan to use.
*/
@@ -423,15 +459,19 @@
protected boolean sameDelta;
public Stats(double[] fp) {
- max = fp[fp.length - 1];
- min = fp[0];
- minDelta = Double.POSITIVE_INFINITY;
+ max = Double.NEGATIVE_INFINITY;
+ min = Double.POSITIVE_INFINITY;
maxDelta = Double.NEGATIVE_INFINITY;
+ minDelta = Double.POSITIVE_INFINITY;
sameDelta = true;
if(fp.length > 1) {
double delta = fp[0] - fp[1];
for(int i = 0; i < fp.length - 1; i++) {
+ if(fp[i] > max)
+ max = fp[i];
+ if(fp[i] < min)
+ min = fp[i];
double ndelta = fp[i] - fp[i + 1];
if(delta < minDelta) {
minDelta = delta;
@@ -444,7 +484,29 @@
}
delta = ndelta;
}
+ if(fp[fp.length - 1] > max)
+ max = fp[fp.length - 1];
+ if(fp[fp.length - 1] < min)
+ min = fp[fp.length - 1];
+ }else{
+ max = fp[0];
+ min = fp[0];
+ maxDelta = 0;
+ minDelta = 0;
}
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Stats{" + this.hashCode() + "}");
+ sb.append(" max: " + max);
+ sb.append(" min: " + min);
+ sb.append(" minΔ: " + minDelta);
+ sb.append(" maxΔ: " + maxDelta);
+ sb.append(" sameΔ: " + maxDelta);
+ return sb.toString();
+ }
+
}
}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
index dacce77..8a8e27b 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlock.java
@@ -41,12 +41,15 @@
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.lops.MMTSJ.MMTSJType;
import org.apache.sysds.lops.MapMultChain.ChainType;
+import org.apache.sysds.runtime.DMLCompressionException;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.colgroup.ColGroup;
import org.apache.sysds.runtime.compress.colgroup.ColGroup.CompressionType;
import org.apache.sysds.runtime.compress.colgroup.ColGroupConverter;
import org.apache.sysds.runtime.compress.colgroup.ColGroupDDC;
import org.apache.sysds.runtime.compress.colgroup.ColGroupIO;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupOLE;
+import org.apache.sysds.runtime.compress.colgroup.ColGroupRLE;
import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed;
import org.apache.sysds.runtime.compress.colgroup.ColGroupValue;
import org.apache.sysds.runtime.compress.colgroup.DenseRowIterator;
@@ -212,7 +215,6 @@
ret.setNonZeros(nonZeros);
LOG.debug("decompressed block w/ k=" + k + " in " + time.stop() + "ms.");
-
return ret;
}
@@ -245,16 +247,16 @@
public double quickGetValue(int r, int c) {
// TODO Optimize Quick Get Value, to located the correct column group without having to search for it
-
ColGroup grp = null;
for(ColGroup group : _colGroups) {
-
if(Arrays.binarySearch(group.getColIndices(), c) >= 0) {
grp = group;
break;
}
}
-
+ if(grp == null) {
+ throw new DMLCompressionException("ColGroup for column index not found");
+ }
// find row value
return grp.get(r, c);
}
@@ -429,7 +431,7 @@
for(ColGroup grp : _colGroups) {
if(grp instanceof ColGroupUncompressed) {
- LOG.error("NOT HANDLING UNCOMPRESSED IN BINARY MV");
+ throw new DMLCompressionException("Not supported Binary MV");
}
else {
@@ -595,7 +597,7 @@
// create output matrix block
if(ret == null)
ret = new MatrixBlock(rl, cl, false, rl * cl);
- else
+ else if(!(ret.getNumColumns() == cl && ret.getNumRows() == rl && ret.isAllocated()))
ret.reset(rl, cl, false, rl * cl);
if(right) {
@@ -608,18 +610,19 @@
}
else {
that = that instanceof CompressedMatrixBlock ? ((CompressedMatrixBlock) that).decompress() : that;
- ret = rightMultByMatrix(_colGroups, that, ret, op.getNumThreads(), getNumColumns());
+ ret = rightMultByMatrix(_colGroups, that, ret, op.getNumThreads(), that.getNumColumns());
}
}
- else{ // Left
+ else { // Left
that = that instanceof CompressedMatrixBlock ? ((CompressedMatrixBlock) that).decompress() : that;
- if(that.getNumRows() == 1){
+ if(that.getNumRows() == 1) {
if(op.getNumThreads() > 1)
- leftMultByVectorTranspose(_colGroups, that, ret, false, op.getNumThreads());
+ return leftMultByVectorTranspose(_colGroups, that, ret, false, op.getNumThreads());
else
- leftMultByVectorTranspose(_colGroups, that, ret, false, true);
- }else{
- leftMultByMatrix(_colGroups, that, ret, op.getNumThreads(), 1);
+ return leftMultByVectorTranspose(_colGroups, that, ret, false, true);
+ }
+ else {
+ return leftMultByMatrix(_colGroups, that, ret, op.getNumThreads(), this.getNumColumns());
}
}
@@ -723,6 +726,14 @@
}
ret.quickSetValue(0, 0, kbuff._sum);
}
+ else if(op.aggOp.increOp.fn instanceof Mean) {
+ double val = ret.quickGetValue(0, 0);
+ for(Future<MatrixBlock> rtask : rtasks) {
+ double tmp = rtask.get().quickGetValue(0, 0);
+ val = val + tmp;
+ }
+ ret.quickSetValue(0, 0, val);
+ }
else {
double val = ret.quickGetValue(0, 0);
for(Future<MatrixBlock> rtask : rtasks) {
@@ -739,12 +750,10 @@
}
}
else {
- // process UC column group
for(ColGroup grp : _colGroups)
if(grp instanceof ColGroupUncompressed)
((ColGroupUncompressed) grp).unaryAggregateOperations(op, ret);
- // process OLE/RLE column groups
aggregateUnaryOperations(op, _colGroups, ret, 0, rlen);
}
@@ -794,33 +803,12 @@
private static void aggregateUnaryOperations(AggregateUnaryOperator op, List<ColGroup> groups, MatrixBlock ret,
int rl, int ru) {
- // Seems misplaced logic for when to use CacheDDC
- // boolean cacheDDC1 = false;
- // op.indexFn instanceof ReduceCol && op.aggOp.increOp.fn instanceof KahanPlus // rowSums
- // && ColGroupOffset.ALLOW_CACHE_CONSCIOUS_ROWSUMS && ru - rl > CompressionSettings.BITMAP_BLOCK_SZ;
-
- // process cache-conscious DDC1 groups (adds to output)
- // if(cacheDDC1) {
- // ArrayList<ColGroupDDC1> tmp = new ArrayList<>();
- // for(ColGroup grp : groups)
- // if(grp instanceof ColGroupDDC1)
- // tmp.add((ColGroupDDC1) grp);
- // if(!tmp.isEmpty())
- // ColGroupDDC1
- // .computeRowSums(tmp.toArray(new ColGroupDDC1[0]), ret, KahanPlus.getKahanPlusFnObject(), rl, ru);
- // }
-
- // process remaining groups (adds to output)
// note: UC group never passed into this function
double[] c = ret.getDenseBlockValues();
- if(c == null) {
- c = ret.getSparseBlock().values(0);
- // throw new RuntimeException("aggregateUnaryOperation failed to materialize matrix data");
- }
for(ColGroup grp : groups)
if(!(grp instanceof ColGroupUncompressed))
grp.unaryAggregateOperations(op, c, rl, ru);
- // LOG.debug(Arrays.toString(c));
+
}
@Override
@@ -946,10 +934,10 @@
int blklen = (int) (Math.ceil((double) rlen / k));
blklen += (blklen % seqsz != 0) ? seqsz - blklen % seqsz : 0;
- ArrayList<RightMatrixMultTask> tasks = new ArrayList<>();
+ ArrayList<RightMatrixVectorMultTask> tasks = new ArrayList<>();
for(int i = 0; i < k & i * blklen < getNumRows(); i++) {
- tasks.add(
- new RightMatrixMultTask(_colGroups, vector, result, i * blklen, Math.min((i + 1) * blklen, rlen)));
+ tasks.add(new RightMatrixVectorMultTask(_colGroups, vector, result, i * blklen,
+ Math.min((i + 1) * blklen, rlen)));
}
List<Future<Long>> ret = pool.invokeAll(tasks);
@@ -961,10 +949,10 @@
lnnz += tmp.get();
result.setNonZeros(lnnz);
}
- catch(Exception e) {
- LOG.error(e);
+ catch(InterruptedException | ExecutionException e) {
throw new DMLRuntimeException(e);
}
+
}
private static void rightMultByVector(List<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, int rl, int ru) {
@@ -1012,11 +1000,11 @@
* @param result buffer to hold the result; must have the appropriate size already
* @param doTranspose if true, transpose vector
*/
- private static void leftMultByVectorTranspose(List<ColGroup> colGroups, MatrixBlock vector, MatrixBlock result,
- boolean doTranspose, boolean allocTmp) {
- // transpose vector if required
- LOG.debug("Left Mult vector Transpose " + vector.getClass());
+ private static MatrixBlock leftMultByVectorTranspose(List<ColGroup> colGroups, MatrixBlock vector,
+ MatrixBlock result, boolean doTranspose, boolean allocTmp) {
+
MatrixBlock rowVector = vector;
+ // Note that transpose here is a metadata operation since the input is a vector.
if(doTranspose) {
rowVector = new MatrixBlock(1, vector.getNumRows(), false);
LibMatrixReorg.transpose(vector, rowVector);
@@ -1028,12 +1016,12 @@
// setup memory pool for reuse
if(allocTmp) {
- Pair<Integer, List<Integer>> v = getMaxNumValues(colGroups);
- ColGroupValue.setupThreadLocalMemory(v.getLeft());
+ Pair<Integer, int[]> v = getMaxNumValues(colGroups);
+ ColGroupValue.setupThreadLocalMemory(v.getLeft() + 1); // +1 for efficiency in DDC groups.
for(int i = 0; i < colGroups.size(); i++) {
colGroups.get(i).leftMultByRowVector(rowVector.getDenseBlockValues(),
result.getDenseBlockValues(),
- v.getRight().get(i));
+ v.getRight()[i]);
}
}
else {
@@ -1048,6 +1036,8 @@
if(allocTmp)
ColGroupValue.cleanupThreadLocalMemory();
result.recomputeNonZeros();
+
+ return result;
}
// private static void leftMultByVectorTranspose(List<ColGroup> colGroups, ColGroupDDC vector, MatrixBlock result) {
@@ -1069,7 +1059,7 @@
* @param doTranspose if true, transpose vector
* @param k number of threads
*/
- private void leftMultByVectorTranspose(List<ColGroup> colGroups, MatrixBlock vector, MatrixBlock result,
+ private MatrixBlock leftMultByVectorTranspose(List<ColGroup> colGroups, MatrixBlock vector, MatrixBlock result,
boolean doTranspose, int k) {
// transpose vector if required
MatrixBlock rowVector = vector;
@@ -1111,6 +1101,7 @@
// post-processing
result.recomputeNonZeros();
+ return result;
}
/**
@@ -1124,111 +1115,72 @@
* @param k The number of threads used
* @param numColumns The number of columns in this colGroup
*/
- private static void leftMultByMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, int k,
+ private static MatrixBlock leftMultByMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, int k,
int numColumns) {
-
- ret.reset();
ret.allocateDenseBlock();
-
if(that.isInSparseFormat()) {
- LOG.warn("Inefficient materialization of sparse matrix for left compressed matrix mult.");
-
- // leftMultBySparseMatrix(colGroups, that, ret, k, numColumns);
- leftMultByDenseMatrix(colGroups, that.allocateDenseBlock(), ret, k, numColumns);
+ ret = leftMultBySparseMatrix(colGroups, that, ret, k, numColumns);
}
else {
- leftMultByDenseMatrix(colGroups, that, ret, k, numColumns);
+ ret = leftMultByDenseMatrix(colGroups, that, ret, k, numColumns);
}
- ret.recomputeNonZeros();
+ ret.setNonZeros(ret.getNumColumns() * ret.getNumRows());
+ return ret;
}
- private static MatrixBlock rightMultByMatrix(List<ColGroup> colGroups,
- MatrixBlock that, MatrixBlock ret, int k, int numColumns)
- {
- // Exchange with rightMultByDenseMatrix.
-
- ret.reset();
+ private static MatrixBlock rightMultByMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, int k,
+ int numColumns) {
ret.allocateDenseBlock();
- // transpose input to optimize column access.
- that = LibMatrixReorg
- .transpose(that, new MatrixBlock(that.getNumColumns(), that.getNumRows(), that.isInSparseFormat()), k);
-
if(that.isInSparseFormat()) {
- LOG.warn("Inefficient materialization of sparse matrix for right compressed matrix mult.");
- rightMultByDenseMatrix(colGroups, that.allocateDenseBlock(), ret, k, numColumns);
+ ret = rightMultBySparseMatrix(colGroups, that, ret, k, numColumns);
}
else {
- rightMultByDenseMatrix(colGroups, that, ret, k, numColumns);
+ ret = rightMultByDenseMatrix(colGroups, that, ret, k, numColumns);
+
}
- ret.recomputeNonZeros();
+ ret.setNonZeros(ret.getNumColumns() * ret.getNumRows());
return ret;
- // ret = LibMatrixReorg
- // .transpose(ret, new MatrixBlock(ret.getNumColumns(), ret.getNumRows(), ret.isInSparseFormat()), k);
-
- // MatrixBlock tmpIn = new MatrixBlock(1, that.getNumColumns(), false).allocateBlock();
- // MatrixBlock tmpOut = new MatrixBlock(this.getNumRows(), 1, false).allocateBlock();
- // for(int i = 0; i < that.getNumRows(); i++) { // on transpose
- // tmpIn = that.slice(i, i, 0, that.getNumColumns() - 1, tmpIn);
- // MatrixBlock tmpIn2 = LibMatrixReorg // meta data op
- // .transpose(tmpIn, new MatrixBlock(tmpIn.getNumColumns(), tmpIn.getNumRows(), false));
- // tmpOut.reset();
- // if(k > 1)
- // rightMultByVector(tmpIn2, tmpOut, k);
- // else
- // rightMultByVector(tmpIn2, tmpOut);
- // MatrixBlock tmpOutT = LibMatrixReorg // meta data op
- // .transpose(tmpOut, new MatrixBlock(tmpOut.getNumColumns(), tmpOut.getNumRows(), false), k);
- // ret.leftIndexingOperations(tmpOutT, i, i, 0, ret.getNumColumns() - 1, ret, UpdateType.INPLACE);
- // }
- // ret.checkNonZeros();
- // ret = LibMatrixReorg
- // .transpose(ret, new MatrixBlock(ret.getNumColumns(), ret.getNumRows(), ret.isInSparseFormat()), k);
- // return ret;
}
- private static void leftMultByDenseMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, int k,
+ private static MatrixBlock leftMultByDenseMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, int k,
int numColumns) {
DenseBlock db = that.getDenseBlock();
+ if(db == null)
+ throw new DMLRuntimeException("Invalid LeftMult By Dense matrix, input matrix was sparse");
+
double[] retV = ret.getDenseBlockValues();
double[] thatV;
int blockU;
int blockL = 0;
-
- for(ColGroup grp : colGroups) {
+ for(ColGroup grp : colGroups)
if(grp instanceof ColGroupUncompressed)
((ColGroupUncompressed) grp).leftMultByMatrix(that, ret);
- }
- for(int b = 0; b <= db.numBlocks(); b++) {
+ for(int b = 0; b < db.numBlocks(); b++) {
int blockSize = db.blockSize(b);
blockU = Math.min(blockL + blockSize, ret.getNumRows());
thatV = db.valuesAt(b);
if(k == 1) {
- // OPTIMIZE dont allocate colGroups for the uncompressable columns.
- // Optimize allocate once the materialization for all the blocks.
- double[][] materialized = new double[colGroups.size()][];
- for(int i = 0; i < colGroups.size(); i++) {
- materialized[i] = colGroups.get(i).getValues();
- }
- Pair<Integer, List<Integer>> v = getMaxNumValues(colGroups);
- for(int j = 0; j < colGroups.size(); j++)
+ Pair<Integer, int[]> v = getMaxNumValues(colGroups);
+ for(int j = 0; j < colGroups.size(); j++) {
colGroups.get(j).leftMultByMatrix(thatV,
retV,
- v.getRight().get(j),
- materialized[j],
+ v.getRight()[j],
+ colGroups.get(j).getValues(),
that.getNumRows(),
ret.getNumColumns(),
- blockL,
- blockU,
+ 0,
+ ret.getNumRows(),
0);
+ }
}
else {
try {
- ExecutorService pool = CommonThreadPool.get(Math.min(colGroups.size(), k));
+ ExecutorService pool = CommonThreadPool.get(k);
// compute remaining compressed column groups in parallel
ArrayList<LeftMatrixMatrixMultTask> tasks = new ArrayList<>();
List<ColGroup>[] parts = createStaticTaskPartitioningForMatrixMult(colGroups, k, false);
@@ -1237,7 +1189,6 @@
for(int blo = blockL; blo < blockU; blo += rowBlockSize) {
tasks.add(new LeftMatrixMatrixMultTask(part, thatV, retV, that.getNumRows(), numColumns,
blo, Math.min(blo + rowBlockSize, blockU), blo - blockL));
-
}
}
@@ -1248,50 +1199,270 @@
future.get();
}
catch(InterruptedException | ExecutionException e) {
- LOG.error(e);
throw new DMLRuntimeException(e);
}
}
blockL += blockSize;
}
+ return ret;
}
- // private static void leftMultBySparseMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, int k,
- // int numColumns) {
- // // SparseBlock sb = that.getSparseBlock();
- // throw new NotImplementedException("Sparse Block input not handled.");
- // }
+ private static MatrixBlock leftMultBySparseMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret,
+ int k, int numColumns) {
- private static void rightMultByDenseMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret, int k,
- int numColumns) {
- DenseBlock db = that.getDenseBlock();
- double[] retV = ret.getDenseBlockValues();
- double[] thatV;
- int blockU;
- int blockL = 0;
+ SparseBlock sb = that.getSparseBlock();
+ if(sb == null)
+ throw new DMLRuntimeException("Invalid Left Mult by Sparse matrix, input matrix was dense");
for(ColGroup grp : colGroups) {
if(grp instanceof ColGroupUncompressed)
- ((ColGroupUncompressed) grp).rightMultByMatrix(that, ret, 0, ret.getNumRows());
+ ((ColGroupUncompressed) grp).leftMultByMatrix(that, ret);
}
- // OPTIMIZE dont allocate colGroups for the uncompressable columns.
- double[][] materialized = new double[colGroups.size()][];
-
- for(int b = 0; b <= db.numBlocks(); b++) {
- int blockSize = db.blockSize(b);
- blockU = Math.min(blockL + blockSize, ret.getNumRows());
- thatV = db.valuesAt(b);
-
+ if(k == 1) {
+ double[][] materialized = new double[colGroups.size()][];
+ boolean containsOLE = false;
for(int i = 0; i < colGroups.size(); i++) {
materialized[i] = colGroups.get(i).getValues();
+ if(colGroups.get(i) instanceof ColGroupOLE) {
+ containsOLE = true;
+ }
}
- Pair<Integer, List<Integer>> v = getMaxNumValues(colGroups);
- for(int j = 0; j < colGroups.size(); j++) {
- colGroups.get(j)
- .rightMultByMatrix(thatV, retV, v.getRight().get(j), materialized[j], blockL, blockU, 0);
+ double[] materializedRow = containsOLE ? new double[CompressionSettings.BITMAP_BLOCK_SZ * 2] : null;
+
+ Pair<Integer, int[]> v = getMaxNumValues(colGroups);
+ for(int r = 0; r < that.getNumRows(); r++) {
+ SparseRow row = sb.get(r);
+ if(row != null) {
+
+ for(int j = 0; j < colGroups.size(); j++) {
+ colGroups.get(j).leftMultBySparseMatrix(row.size(),
+ row.indexes(),
+ row.values(),
+ ret.getDenseBlockValues(),
+ v.getRight()[j],
+ materialized[j],
+ that.getNumRows(),
+ ret.getNumColumns(),
+ r,
+ materializedRow);
+ }
+ }
}
}
+ else {
+ ExecutorService pool = CommonThreadPool.get(k);
+ ArrayList<LeftMatrixSparseMatrixMultTask> tasks = new ArrayList<>();
+ try {
+ // compute remaining compressed column groups in parallel
+ List<ColGroup>[] parts = createStaticTaskPartitioningForSparseMatrixMult(colGroups, k, false);
+ for(List<ColGroup> part : parts) {
+ tasks.add(new LeftMatrixSparseMatrixMultTask(part, sb, ret.getDenseBlockValues(), that.getNumRows(),
+ numColumns));
+ }
+
+ List<Future<Object>> futures = pool.invokeAll(tasks);
+ pool.shutdown();
+ for(Future<Object> future : futures)
+ future.get();
+ }
+ catch(InterruptedException | ExecutionException e) {
+ throw new DMLRuntimeException(e);
+ }
+ }
+
+ return ret;
+
+ }
+
+ private static MatrixBlock rightMultByDenseMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret,
+ int k, int numColumns) {
+
+ // long StartTime = System.currentTimeMillis();
+ DenseBlock db = that.getDenseBlock();
+ double[] retV = ret.getDenseBlockValues();
+ double[] thatV;
+
+ for(ColGroup grp : colGroups) {
+ if(grp instanceof ColGroupUncompressed) {
+ ((ColGroupUncompressed) grp).rightMultByMatrix(that, ret, 0, ret.getNumRows());
+ }
+ }
+
+ if(k == 1) {
+ Pair<Integer, int[]> v = getMaxNumValues(colGroups);
+ ColGroupValue.setupThreadLocalMemory((v.getLeft()) * that.getNumColumns());
+ for(int b = 0; b < db.numBlocks(); b++) {
+ // int blockSize = db.blockSize(b);
+ thatV = db.valuesAt(b);
+ for(int j = 0; j < colGroups.size(); j++) {
+ int colBlockSize = 128;
+ for(int i = 0; i < that.getNumColumns(); i += colBlockSize) {
+ if(colGroups.get(j) instanceof ColGroupValue) {
+ double[] preAggregatedB = ((ColGroupValue) colGroups.get(j)).preaggValues(v.getRight()[j],
+ thatV,
+ colGroups.get(j).getValues(),
+ i,
+ Math.min(i + colBlockSize, that.getNumColumns()),
+ that.getNumColumns());
+ int blklenRows = CompressionSettings.BITMAP_BLOCK_SZ;
+ for(int n = 0; n * blklenRows < ret.getNumRows(); n++) {
+ colGroups.get(j).rightMultByMatrix(preAggregatedB,
+ retV,
+ numColumns,
+ n * blklenRows,
+ Math.min((n + 1) * blklenRows, ret.getNumRows()),
+ i,
+ Math.min(i + colBlockSize, that.getNumColumns()));
+ }
+ }
+ }
+ }
+ }
+ ColGroupValue.cleanupThreadLocalMemory();
+ }
+ else {
+ // for(int b = 0; b < db.numBlocks(); b++) {
+ // compute remaining compressed column groups in parallel
+ // int blockSize = db.blockSize(b);
+ // int blockUCols = that.getNumColumns();
+ thatV = db.valuesAt(0);
+ ExecutorService pool = CommonThreadPool.get(k);
+ ArrayList<RightMatrixMultTask> tasks = new ArrayList<>();
+ ArrayList<RightMatrixPreAggregateTask> preTask = new ArrayList<>(colGroups.size());
+ Pair<Integer, int[]> v;
+ final int blkz = CompressionSettings.BITMAP_BLOCK_SZ;
+ int blklenRows = (int) (Math.ceil((double) ret.getNumRows() / (4 * k)));
+
+ List<ColGroup> ddcGroups = new ArrayList<>();
+ List<ColGroup> oleGroups = new ArrayList<>();
+ List<ColGroup> rleGroups = new ArrayList<>();
+ for(ColGroup g : colGroups) {
+ if(g instanceof ColGroupDDC) {
+ ddcGroups.add(g);
+ }
+ else if(g instanceof ColGroupOLE) {
+ oleGroups.add(g);
+ }
+ else if(g instanceof ColGroupRLE) {
+ rleGroups.add(g);
+ }
+ }
+
+ try {
+ // Process DDC Groups!
+ // int blklenRows = CompressionSettings.BITMAP_BLOCK_SZ;
+ v = getMaxNumValues(ddcGroups);
+ List<Future<double[]>> ag = pool.invokeAll(preAggregate(ddcGroups, thatV, that, preTask, v));
+
+ for(int j = 0; j * blklenRows < ret.getNumRows(); j++) {
+ RightMatrixMultTask rmmt = new RightMatrixMultTask(ddcGroups, retV, ag, v, numColumns,
+ j * blklenRows, Math.min((j + 1) * blklenRows, ret.getNumRows()), 0, that.getNumColumns(),
+ false);
+ tasks.add(rmmt);
+ }
+ for(Future<Object> future : pool.invokeAll(tasks))
+ future.get();
+ tasks.clear();
+
+ // Process RLE Groups
+ blklenRows += (blklenRows % blkz != 0) ? blkz - blklenRows % blkz : 0;
+ v = getMaxNumValues(rleGroups);
+ preTask = preAggregate(rleGroups, thatV, that, preTask, v);
+ for(int j = 0; j * blklenRows < ret.getNumRows(); j++) {
+ RightMatrixMultTask rmmt = new RightMatrixMultTask(rleGroups, retV, pool.invokeAll(preTask), v,
+ numColumns, j * blklenRows, Math.min((j + 1) * blklenRows, ret.getNumRows()), 0,
+ that.getNumColumns(), true);
+ tasks.add(rmmt);
+ }
+
+ for(Future<Object> future : pool.invokeAll(tasks))
+ future.get();
+ tasks.clear();
+
+ // Process OLE Groups
+ // blklenRows += (blklenRows % blkz != 0) ? blkz - blklenRows % blkz : 0;
+
+ v = getMaxNumValues(oleGroups);
+ preTask = preAggregate(oleGroups, thatV, that, preTask, v);
+ for(int j = 0; j * blklenRows < ret.getNumRows(); j++) {
+ RightMatrixMultTask rmmt = new RightMatrixMultTask(oleGroups, retV, pool.invokeAll(preTask), v,
+ numColumns, j * blklenRows, Math.min((j + 1) * blklenRows, ret.getNumRows()), 0,
+ that.getNumColumns(), true);
+ tasks.add(rmmt);
+ }
+ for(Future<Object> future : pool.invokeAll(tasks))
+ future.get();
+ pool.shutdown();
+ }
+ catch(InterruptedException | ExecutionException e) {
+ throw new DMLRuntimeException(e);
+ }
+ // }
+ }
+
+ return ret;
+ }
+
+ private static ArrayList<RightMatrixPreAggregateTask> preAggregate(List<ColGroup> colGroups, double[] thatV,
+ MatrixBlock that, ArrayList<RightMatrixPreAggregateTask> preTask, Pair<Integer, int[]> v) {
+ preTask.clear();
+ for(int h = 0; h < colGroups.size(); h++) {
+ RightMatrixPreAggregateTask pAggT = new RightMatrixPreAggregateTask((ColGroupValue) colGroups.get(h),
+ v.getRight()[h], thatV, colGroups.get(h).getValues(), 0, that.getNumColumns(), that.getNumColumns());
+ preTask.add(pAggT);
+ }
+ return preTask;
+ }
+
+ private static MatrixBlock rightMultBySparseMatrix(List<ColGroup> colGroups, MatrixBlock that, MatrixBlock ret,
+ int k, int numColumns) {
+ SparseBlock sb = that.getSparseBlock();
+ double[] retV = ret.getDenseBlockValues();
+
+ if(sb == null)
+ throw new DMLRuntimeException("Invalid Right Mult by Sparse matrix, input matrix was dense");
+
+ for(ColGroup grp : colGroups) {
+ if(grp instanceof ColGroupUncompressed)
+ ((ColGroupUncompressed) grp).rightMultByMatrix(that, ret, 0, ret.getNumColumns());
+ }
+
+ Pair<Integer, int[]> v = getMaxNumValues(colGroups);
+ // if(k == 1) {
+ for(int j = 0; j < colGroups.size(); j++) {
+ double[] preAggregatedB = ((ColGroupValue) colGroups.get(j)).preaggValues(v.getRight()[j],
+ sb,
+ colGroups.get(j).getValues(),
+ 0,
+ that.getNumColumns(),
+ that.getNumColumns());
+ colGroups.get(j)
+ .rightMultByMatrix(preAggregatedB, retV, numColumns, 0, ret.getNumRows(), 0, that.getNumColumns());
+
+ }
+ // }
+ // else {
+ // ExecutorService pool = CommonThreadPool.get(k);
+ // ArrayList<RightMultBySparseMatrixTask> tasks = new ArrayList<>();
+ // try {
+
+ // for(int j = 0; j < ret.getNumColumns(); j += CompressionSettings.BITMAP_BLOCK_SZ) {
+ // tasks.add(new RightMultBySparseMatrixTask(colGroups, retV, sb, materialized, v, numColumns, j,
+ // Math.min(j + CompressionSettings.BITMAP_BLOCK_SZ, ret.getNumColumns())));
+ // }
+
+ // List<Future<Object>> futures = pool.invokeAll(tasks);
+ // pool.shutdown();
+ // for(Future<Object> future : futures)
+ // future.get();
+ // }
+ // catch(InterruptedException | ExecutionException e) {
+ // throw new DMLRuntimeException(e);
+ // }
+ // }
+
+ return ret;
}
private static void leftMultByTransposeSelf(List<ColGroup> groups, MatrixBlock result, int gl, int gu) {
@@ -1306,7 +1477,7 @@
tmpret.allocateDenseBlock();
// setup memory pool for reuse
- ColGroupValue.setupThreadLocalMemory(getMaxNumValues(groups).getLeft());
+ ColGroupValue.setupThreadLocalMemory(getMaxNumValues(groups).getLeft() + 1);
// approach: for each colgroup, extract uncompressed columns one at-a-time
// vector-matrix multiplies against remaining col groups
@@ -1396,18 +1567,45 @@
return grpParts;
}
- private static Pair<Integer, List<Integer>> getMaxNumValues(List<ColGroup> groups) {
+ @SuppressWarnings("unchecked")
+ private static List<ColGroup>[] createStaticTaskPartitioningForSparseMatrixMult(List<ColGroup> colGroups, int k,
+ boolean inclUncompressed) {
+ int numTasks = Math.min(k, colGroups.size());
+ List<ColGroup>[] grpParts = new ArrayList[numTasks];
+ int pos = 0;
+ for(int i = 0; i < numTasks; i++) {
+ grpParts[pos++] = new ArrayList<>();
+ }
+ pos = 0;
+ for(ColGroup grp : colGroups) {
+
+ if(grp instanceof ColGroupOLE) {
+ grpParts[pos].add((ColGroupOLE) grp);
+ pos = (pos == numTasks - 1) ? 0 : pos + 1;
+ }
+ }
+ for(ColGroup grp : colGroups) {
+ if(!(grp instanceof ColGroupOLE) && (inclUncompressed || !(grp instanceof ColGroupUncompressed))) {
+ grpParts[pos].add(grp);
+ pos = (pos == numTasks - 1) ? 0 : pos + 1;
+ }
+ }
+
+ return grpParts;
+ }
+
+ private static Pair<Integer, int[]> getMaxNumValues(List<ColGroup> groups) {
int numVals = 1;
- List<Integer> numValues = new ArrayList<>(groups.size());
+ int[] numValues = new int[groups.size()];
int nr;
- for(ColGroup grp : groups)
- if(grp instanceof ColGroupValue) {
- nr = ((ColGroupValue) grp).getNumValues();
- numValues.add(nr);
+ for(int i = 0; i < groups.size(); i++)
+ if(groups.get(i) instanceof ColGroupValue) {
+ nr = ((ColGroupValue) groups.get(i)).getNumValues();
+ numValues[i] = nr;
numVals = Math.max(numVals, nr);
}
else {
- numValues.add(-1);
+ numValues[i] = -1;
}
return new ImmutablePair<>(numVals, numValues);
}
@@ -1439,12 +1637,11 @@
public Object call() {
// setup memory pool for reuse
try {
- Pair<Integer, List<Integer>> v = getMaxNumValues(_groups);
- ColGroupValue.setupThreadLocalMemory(v.getLeft());
+ Pair<Integer, int[]> v = getMaxNumValues(_groups);
+ ColGroupValue.setupThreadLocalMemory(v.getLeft() + 1);
for(int i = 0; i < _groups.size(); i++) {
- _groups.get(i).leftMultByRowVector(_vect.getDenseBlockValues(),
- _ret.getDenseBlockValues(),
- v.getRight().get(i));
+ _groups.get(i)
+ .leftMultByRowVector(_vect.getDenseBlockValues(), _ret.getDenseBlockValues(), v.getRight()[i]);
}
ColGroupValue.cleanupThreadLocalMemory();
@@ -1486,13 +1683,13 @@
for(int i = 0; i < _group.size(); i++) {
materialized[i] = _group.get(i).getValues();
}
- Pair<Integer, List<Integer>> v = getMaxNumValues(_group);
+ Pair<Integer, int[]> v = getMaxNumValues(_group);
try {
- ColGroupValue.setupThreadLocalMemory(v.getLeft());
+ ColGroupValue.setupThreadLocalMemory(v.getLeft() + 1);
for(int j = 0; j < _group.size(); j++) {
_group.get(j).leftMultByMatrix(_that,
_ret,
- v.getRight().get(j),
+ v.getRight()[j],
materialized[j],
_numRows,
_numCols,
@@ -1510,14 +1707,78 @@
}
}
- private static class RightMatrixMultTask implements Callable<Long> {
+ private static class LeftMatrixSparseMatrixMultTask implements Callable<Object> {
+ private final List<ColGroup> _group;
+ private final SparseBlock _that;
+ private final double[] _ret;
+ private final int _numRows;
+ private final int _numCols;
+
+ protected LeftMatrixSparseMatrixMultTask(List<ColGroup> group, SparseBlock that, double[] ret, int numRows,
+ int numCols) {
+ _group = group;
+ _that = that;
+ _ret = ret;
+ _numRows = numRows;
+ _numCols = numCols;
+ }
+
+ @Override
+ public Object call() {
+ // setup memory pool for reuse
+
+ // double[][] materialized = new double[_group.size()][];
+ // for(int i = 0; i < _group.size(); i++) {
+ // materialized[i] = _group.get(i).getValues();
+ // }
+
+ boolean containsOLE = false;
+ for(int j = 0; j < _group.size(); j++) {
+ if(_group.get(j) instanceof ColGroupOLE) {
+ containsOLE = true;
+ }
+ }
+ // Temporary Array to store 2 * block size in
+ double[] tmpA = containsOLE ? new double[CompressionSettings.BITMAP_BLOCK_SZ * 2] : null;
+
+ Pair<Integer, int[]> v = getMaxNumValues(_group);
+ ColGroupValue.setupThreadLocalMemory(v.getLeft());
+ try {
+ for(int j = 0; j < _group.size(); j++) {
+ double[] materializedV = _group.get(j).getValues();
+ for(int r = 0; r < _that.numRows(); r++) {
+ if(_that.get(r) != null) {
+ _group.get(j).leftMultBySparseMatrix(_that.get(r).size(),
+ _that.get(r).indexes(),
+ _that.get(r).values(),
+ _ret,
+ v.getRight()[j],
+ materializedV,
+ _numRows,
+ _numCols,
+ r,
+ tmpA);
+ }
+ }
+ }
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ throw new DMLRuntimeException(e);
+ }
+ ColGroupValue.cleanupThreadLocalMemory();
+ return null;
+ }
+ }
+
+ private static class RightMatrixVectorMultTask implements Callable<Long> {
private final List<ColGroup> _groups;
private final MatrixBlock _vect;
private final MatrixBlock _ret;
private final int _rl;
private final int _ru;
- protected RightMatrixMultTask(List<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, int rl, int ru) {
+ protected RightMatrixVectorMultTask(List<ColGroup> groups, MatrixBlock vect, MatrixBlock ret, int rl, int ru) {
_groups = groups;
_vect = vect;
_ret = ret;
@@ -1538,6 +1799,88 @@
}
}
+ private static class RightMatrixMultTask implements Callable<Object> {
+ private final List<ColGroup> _colGroups;
+ // private final double[] _thatV;
+ private final double[] _retV;
+ private final List<Future<double[]>> _aggB;
+ private final Pair<Integer, int[]> _v;
+ private final int _numColumns;
+
+ private final int _rl;
+ private final int _ru;
+ private final int _cl;
+ private final int _cu;
+ private final boolean _mem;
+
+ protected RightMatrixMultTask(List<ColGroup> groups, double[] retV, List<Future<double[]>> aggB,
+ Pair<Integer, int[]> v, int numColumns, int rl, int ru, int cl, int cu, boolean mem) {
+ _colGroups = groups;
+ // _thatV = thatV;
+ _retV = retV;
+ _aggB = aggB;
+ _v = v;
+ _numColumns = numColumns;
+ _rl = rl;
+ _ru = ru;
+ _cl = cl;
+ _cu = cu;
+ _mem = mem;
+ }
+
+ @Override
+ public Object call() {
+ try {
+ if(_mem)
+ ColGroupValue.setupThreadLocalMemory((_v.getLeft()));
+ for(int j = 0; j < _colGroups.size(); j++) {
+ // if (_colGroups.get(j) instanceof ColGroupRLE)
+ _colGroups.get(j).rightMultByMatrix(_aggB.get(j).get(), _retV, _numColumns, _rl, _ru, _cl, _cu);
+ }
+ if(_mem)
+ ColGroupValue.cleanupThreadLocalMemory();
+ return null;
+ }
+ catch(Exception e) {
+ LOG.error(e);
+ throw new DMLRuntimeException(e);
+ }
+ }
+ }
+
+ private static class RightMatrixPreAggregateTask implements Callable<double[]> {
+ private final ColGroupValue _colGroup;
+ private final int _numVals;
+ private final double[] _b;
+ private final double[] _dict;
+
+ private final int _cl;
+ private final int _cu;
+ private final int _cut;
+
+ protected RightMatrixPreAggregateTask(ColGroupValue colGroup, int numVals, double[] b, double[] dict, int cl,
+ int cu, int cut) {
+ _colGroup = colGroup;
+ _numVals = numVals;
+ _b = b;
+ _dict = dict;
+ _cl = cl;
+ _cu = cu;
+ _cut = cut;
+ }
+
+ @Override
+ public double[] call() {
+ try {
+ return _colGroup.preaggValues(_numVals, _b, _dict, _cl, _cu, _cut);
+ }
+ catch(Exception e) {
+ LOG.error(e);
+ throw new DMLRuntimeException(e);
+ }
+ }
+ }
+
private static class MatrixMultTransposeTask implements Callable<Object> {
private final List<ColGroup> _groups;
private final MatrixBlock _ret;
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
index 4a0632f..ced8d63 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressedMatrixBlockFactory.java
@@ -50,7 +50,8 @@
return compress(mb, 1, defaultCompressionSettings);
}
- public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, CompressionSettings customSettings) {
+ public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb,
+ CompressionSettings customSettings) {
return compress(mb, 1, customSettings);
}
@@ -73,7 +74,8 @@
* @param compSettings The Compression settings used
* @return A compressed matrix block.
*/
- public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, int k, CompressionSettings compSettings) {
+ public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, int k,
+ CompressionSettings compSettings) {
// Check for redundant compression
if(mb instanceof CompressedMatrixBlock) {
throw new DMLRuntimeException("Redundant compression, block already compressed.");
@@ -120,9 +122,15 @@
// --------------------------------------------------
// PHASE 2: Grouping columns
// Divide the columns into column groups.
- List<int[]> coCodeColGroups = PlanningCoCoder.findCoCodesByPartitioning(sizeEstimator, sizeInfos, numRows, k, compSettings);
+ List<int[]> coCodeColGroups = PlanningCoCoder
+ .findCoCodesByPartitioning(sizeEstimator, sizeInfos, numRows, k, compSettings);
_stats.setNextTimePhase(time.stop());
- LOG.debug("--compression phase 2: " + _stats.getLastTimePhase());
+ if(LOG.isDebugEnabled()) {
+
+ LOG.debug("--compression phase 2: " + _stats.getLastTimePhase());
+ for(int[] group : coCodeColGroups)
+ LOG.debug(Arrays.toString(group));
+ }
// TODO: Make second estimate of memory usage if the ColGroups are as above?
// This should already be done inside the PlanningCoCoder, and therefore this information
@@ -150,10 +158,10 @@
// --------------------------------------------------
// PHASE 4: Best-effort dictionary sharing for DDC1 single-col groups
// Dictionary dict = (!(compSettings.validCompressions.contains(CompressionType.DDC)) ||
- // !(compSettings.allowSharedDDCDictionary)) ? null : createSharedDDC1Dictionary(colGroupList);
+ // !(compSettings.allowSharedDDCDictionary)) ? null : createSharedDDC1Dictionary(colGroupList);
// if(dict != null) {
- // applySharedDDC1Dictionary(colGroupList, dict);
- // res._sharedDDC1Dict = true;
+ // applySharedDDC1Dictionary(colGroupList, dict);
+ // res._sharedDDC1Dict = true;
// }
// _stats.setNextTimePhase(time.stop());
if(LOG.isDebugEnabled()) {
@@ -180,18 +188,21 @@
_stats.setNextTimePhase(time.stop());
_stats.setColGroupsCounts(colGroupList);
- LOG.debug("--num col groups: " + colGroupList.size() + ", -- num input cols: " + numCols);
- LOG.debug("--compression phase 5: " + _stats.getLastTimePhase());
- LOG.debug("--col groups types " + _stats.getGroupsTypesString());
- LOG.debug("--col groups sizes " + _stats.getGroupsSizesString());
- LOG.debug("--compressed size: " + _stats.size);
- LOG.debug("--compression ratio: " + _stats.ratio);
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("--num col groups: " + colGroupList.size() + ", -- num input cols: " + numCols);
+ LOG.debug("--compression phase 5: " + _stats.getLastTimePhase());
+ LOG.debug("--col groups types " + _stats.getGroupsTypesString());
+ LOG.debug("--col groups sizes " + _stats.getGroupsSizesString());
+ LOG.debug("--compressed size: " + _stats.size);
+ LOG.debug("--compression ratio: " + _stats.ratio);
- if( LOG.isTraceEnabled()){
- for (ColGroup colGroup : colGroupList) {
- LOG.trace("--colGroups colIndexes : " + Arrays.toString(colGroup.getColIndices()));
- LOG.trace("--colGroups type : " + colGroup.getClass().getSimpleName());
- LOG.trace("--colGroups Values : " + Arrays.toString(colGroup.getValues()));
+ if(LOG.isTraceEnabled()) {
+ for(ColGroup colGroup : colGroupList) {
+ LOG.trace("--colGroups colIndexes : " + Arrays.toString(colGroup.getColIndices()));
+ LOG.trace("--colGroups type : " + colGroup.getClass().getSimpleName());
+ LOG.trace("--colGroups Values : " + Arrays.toString(colGroup.getValues()));
+ }
}
}
@@ -206,59 +217,59 @@
* @return the shared value list for the DDC ColGroups.
*/
// private static Dictionary createSharedDDC1Dictionary(List<ColGroup> colGroups) {
- // // create joint dictionary
- // HashSet<Double> vals = new HashSet<>();
- // HashMap<Integer, Double> mins = new HashMap<>();
- // HashMap<Integer, Double> maxs = new HashMap<>();
- // int numDDC1 = 0;
- // for(final ColGroup grp : colGroups)
- // if(grp.getNumCols() == 1 && grp instanceof ColGroupDDC1) {
- // final ColGroupDDC1 grpDDC1 = (ColGroupDDC1) grp;
- // final double[] values = grpDDC1.getValues();
- // double min = Double.POSITIVE_INFINITY;
- // double max = Double.NEGATIVE_INFINITY;
- // for(int i = 0; i < values.length; i++) {
- // vals.add(values[i]);
- // min = Math.min(min, values[i]);
- // max = Math.max(max, values[i]);
- // }
- // mins.put(grpDDC1.getColIndex(0), min);
- // maxs.put(grpDDC1.getColIndex(0), max);
- // numDDC1++;
- // }
+ // // create joint dictionary
+ // HashSet<Double> vals = new HashSet<>();
+ // HashMap<Integer, Double> mins = new HashMap<>();
+ // HashMap<Integer, Double> maxs = new HashMap<>();
+ // int numDDC1 = 0;
+ // for(final ColGroup grp : colGroups)
+ // if(grp.getNumCols() == 1 && grp instanceof ColGroupDDC1) {
+ // final ColGroupDDC1 grpDDC1 = (ColGroupDDC1) grp;
+ // final double[] values = grpDDC1.getValues();
+ // double min = Double.POSITIVE_INFINITY;
+ // double max = Double.NEGATIVE_INFINITY;
+ // for(int i = 0; i < values.length; i++) {
+ // vals.add(values[i]);
+ // min = Math.min(min, values[i]);
+ // max = Math.max(max, values[i]);
+ // }
+ // mins.put(grpDDC1.getColIndex(0), min);
+ // maxs.put(grpDDC1.getColIndex(0), max);
+ // numDDC1++;
+ // }
- // // abort shared dictionary creation if empty or too large
- // int maxSize = vals.contains(0d) ? 256 : 255;
- // if(numDDC1 < 2 || vals.size() > maxSize)
- // return null;
+ // // abort shared dictionary creation if empty or too large
+ // int maxSize = vals.contains(0d) ? 256 : 255;
+ // if(numDDC1 < 2 || vals.size() > maxSize)
+ // return null;
- // // build consolidated shared dictionary
- // double[] values = vals.stream().mapToDouble(Double::doubleValue).toArray();
- // int[] colIndexes = new int[numDDC1];
- // double[] extrema = new double[2 * numDDC1];
- // int pos = 0;
- // for(Entry<Integer, Double> e : mins.entrySet()) {
- // colIndexes[pos] = e.getKey();
- // extrema[2 * pos] = e.getValue();
- // extrema[2 * pos + 1] = maxs.get(e.getKey());
- // pos++;
- // }
- // return new DictionaryShared(values, colIndexes, extrema);
+ // // build consolidated shared dictionary
+ // double[] values = vals.stream().mapToDouble(Double::doubleValue).toArray();
+ // int[] colIndexes = new int[numDDC1];
+ // double[] extrema = new double[2 * numDDC1];
+ // int pos = 0;
+ // for(Entry<Integer, Double> e : mins.entrySet()) {
+ // colIndexes[pos] = e.getKey();
+ // extrema[2 * pos] = e.getValue();
+ // extrema[2 * pos + 1] = maxs.get(e.getKey());
+ // pos++;
+ // }
+ // return new DictionaryShared(values, colIndexes, extrema);
// }
// private static void applySharedDDC1Dictionary(List<ColGroup> colGroups, Dictionary dict) {
- // // create joint mapping table
- // HashMap<Double, Integer> map = new HashMap<>();
- // double[] values = dict.getValues();
- // for(int i = 0; i < values.length; i++)
- // map.put(values[i], i);
+ // // create joint mapping table
+ // HashMap<Double, Integer> map = new HashMap<>();
+ // double[] values = dict.getValues();
+ // for(int i = 0; i < values.length; i++)
+ // map.put(values[i], i);
- // // recode data of all relevant DDC1 groups
- // for(ColGroup grp : colGroups)
- // if(grp.getNumCols() == 1 && grp instanceof ColGroupDDC1) {
- // ColGroupDDC1 grpDDC1 = (ColGroupDDC1) grp;
- // grpDDC1.recodeData(map);
- // grpDDC1.setDictionary(dict);
- // }
+ // // recode data of all relevant DDC1 groups
+ // for(ColGroup grp : colGroups)
+ // if(grp.getNumCols() == 1 && grp instanceof ColGroupDDC1) {
+ // ColGroupDDC1 grpDDC1 = (ColGroupDDC1) grp;
+ // grpDDC1.recodeData(map);
+ // grpDDC1.setDictionary(dict);
+ // }
// }
}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
index 02620d0..2fe6642 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/CompressionSettingsBuilder.java
@@ -39,7 +39,7 @@
private boolean lossy = false;
private EnumSet<CompressionType> validCompressions;
private boolean sortValuesByLength = false;
- private PartitionerType columnPartitioner = PartitionerType.STATIC; // BIN_PACKING or STATIC
+ private PartitionerType columnPartitioner = PartitionerType.COST;
private int maxStaticColGroupCoCode = 1;
public CompressionSettingsBuilder() {
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/ColumnGroupPartitionerCost.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/ColumnGroupPartitionerCost.java
new file mode 100644
index 0000000..24f33db
--- /dev/null
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/ColumnGroupPartitionerCost.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sysds.runtime.compress.cocode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.sysds.runtime.compress.CompressionSettings;
+import org.apache.sysds.runtime.compress.cocode.PlanningCoCoder.GroupableColInfo;
+
+/**
+ * Column group partitioning with static number distinct elements heuristic
+ */
+public class ColumnGroupPartitionerCost extends ColumnGroupPartitioner {
+ private static final Log LOG = LogFactory.getLog(ColumnGroupPartitionerCost.class.getName());
+ /**
+ * This value specifies the maximum distinct count allowed int a coCoded group. Note that this value is the number
+ * of distinct rows not the total number of values. That value can be calculated by multiplying with the number of
+ * rows in the coCoded group.
+ */
+ private static final int largestDistinct = 512;
+
+ @Override
+ public List<int[]> partitionColumns(List<Integer> groupCols, HashMap<Integer, GroupableColInfo> groupColsInfo,
+ CompressionSettings cs) {
+
+ TreeMap<Integer, Queue<Queue<Integer>>> distToColId = new TreeMap<>();
+ for(Entry<Integer, GroupableColInfo> ent : groupColsInfo.entrySet()) {
+ int distinct = ent.getValue().nrDistinct;
+ if(distToColId.containsKey(distinct)) {
+ Queue<Integer> cocodeGroup = new LinkedList<>();
+ cocodeGroup.add(ent.getKey());
+ distToColId.get(distinct).add(cocodeGroup);
+ }
+ else {
+ Queue<Queue<Integer>> cocodeGroups = new LinkedList<>();
+ Queue<Integer> cocodeGroup = new LinkedList<>();
+ cocodeGroup.add(ent.getKey());
+ cocodeGroups.add(cocodeGroup);
+ distToColId.put(distinct, cocodeGroups);
+ }
+ }
+
+ boolean change = false;
+ while(distToColId.firstKey() < largestDistinct) {
+ Entry<Integer, Queue<Queue<Integer>>> elm = distToColId.pollFirstEntry();
+ if(elm.getValue().size() > 1) {
+ int distinctCombinations = elm.getKey()>0 ? elm.getKey() : 1;
+ Queue<Queue<Integer>> group = elm.getValue();
+ int size = group.size();
+ if(Math.pow(distinctCombinations, size) < largestDistinct) {
+ Queue<Integer> t = elm.getValue().stream().reduce(new LinkedList<>(), (acc, e) -> {
+ acc.addAll(e);
+ return acc;
+ });
+ elm.getValue().clear();
+ if(distToColId.containsKey((int) Math.pow(distinctCombinations, size))){
+ distToColId.get((int) Math.pow(distinctCombinations, size)).add(t);
+ }else{
+ elm.getValue().add(t);
+ distToColId.put((int) Math.pow(distinctCombinations, size), elm.getValue());
+ }
+ change = true;
+ }
+ else if(distinctCombinations * distinctCombinations < largestDistinct) {
+ Queue<Integer> cols = elm.getValue().poll();
+ cols.addAll(elm.getValue().poll());
+ if(distToColId.containsKey(distinctCombinations * distinctCombinations)) {
+ Queue<Queue<Integer>> p = distToColId.get(distinctCombinations * distinctCombinations);
+ p.add(cols);
+ }
+ else {
+ Queue<Queue<Integer>> n = new LinkedList<>();
+ n.add(cols);
+ distToColId.put(distinctCombinations * distinctCombinations, n);
+ }
+ if(elm.getValue().size() > 0) {
+ distToColId.put(elm.getKey(), elm.getValue());
+ }
+ change = true;
+ }
+ else {
+ change = false;
+ distToColId.put(elm.getKey(), elm.getValue());
+ }
+ }
+ else if(!distToColId.isEmpty()) {
+ Entry<Integer, Queue<Queue<Integer>>> elm2 = distToColId.pollFirstEntry();
+ int size1 = elm.getKey()>0 ? elm.getKey() : 1;
+ int size2 = elm2.getKey()>0 ? elm2.getKey() : 1;
+ if(size1 * size2 < largestDistinct) {
+ Queue<Integer> cols = elm.getValue().poll();
+ cols.addAll(elm2.getValue().poll());
+ if(elm2.getKey() == size1 * size2){
+ elm2.getValue().add(cols);
+ }
+ else if(distToColId.containsKey(size1 * size2)) {
+ distToColId.get(size1 * size2).add(cols);
+ }
+ else {
+ Queue<Queue<Integer>> n = new LinkedList<>();
+ n.add(cols);
+ distToColId.put(size1 * size2, n);
+ }
+ if(elm.getValue().size() > 0) {
+ distToColId.put(elm.getKey(), elm.getValue());
+ }
+ if(elm2.getValue().size() > 0) {
+ distToColId.put(elm2.getKey(), elm2.getValue());
+ }
+ change = true;
+ }
+ else {
+ change = false;
+ distToColId.put(elm.getKey(), elm.getValue());
+ distToColId.put(elm2.getKey(), elm2.getValue());
+ }
+ }
+ else {
+ distToColId.put(elm.getKey(), elm.getValue());
+ break;
+ }
+ if(!change)
+ break;
+ }
+ List<int[]> ret = new ArrayList<>();
+
+ for(Queue<Queue<Integer>> x : distToColId.values())
+ for(Queue<Integer> y : x) {
+ int[] g = new int[y.size()];
+ int idx = 0;
+ for(Integer id : y)
+ g[idx++] = id;
+ Arrays.sort(g);
+ ret.add(g);
+ }
+
+ if(LOG.isDebugEnabled()){
+ StringBuilder sb = new StringBuilder();
+ for(int[] cg : ret)
+ sb.append(Arrays.toString(cg));
+ LOG.debug(sb.toString());
+ }
+ return ret;
+ }
+}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/cocode/PlanningCoCoder.java b/src/main/java/org/apache/sysds/runtime/compress/cocode/PlanningCoCoder.java
index c239f7b..6071ecd 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/cocode/PlanningCoCoder.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/cocode/PlanningCoCoder.java
@@ -41,7 +41,7 @@
private static final Log LOG = LogFactory.getLog(PlanningCoCoder.class.getName());
public enum PartitionerType {
- BIN_PACKING, STATIC,
+ BIN_PACKING, STATIC, COST,
}
/**
@@ -69,15 +69,19 @@
HashMap<Integer, GroupableColInfo> groupColsInfo = new HashMap<>();
for(int i = 0; i < numCols; i++) {
int colIx = cols.get(i);
- double cardinality = colGroups[colIx].getEstCard();
- double weight = cardinality / numRows;
+ int cardinality = colGroups[colIx].getEstCard();
+ double weight = ((double)cardinality) / numRows;
groupCols.add(colIx);
- groupColsInfo.put(colIx, new GroupableColInfo(weight, colGroups[colIx].getMinSize()));
+ groupColsInfo.put(colIx, new GroupableColInfo(weight, colGroups[colIx].getMinSize(), cardinality));
}
// use column group partitioner to create partitions of columns
List<int[]> bins = createColumnGroupPartitioner(cs.columnPartitioner)
.partitionColumns(groupCols, groupColsInfo, cs);
+
+ if (cs.columnPartitioner == PartitionerType.COST){
+ return bins;
+ }
// brute force grouping within each partition
return (k > 1) ? getCocodingGroupsBruteForce(bins,
@@ -210,6 +214,8 @@
case STATIC:
return new ColumnGroupPartitionerStatic();
+ case COST:
+ return new ColumnGroupPartitionerCost();
default:
throw new RuntimeException("Unsupported column group partitioner: " + type.toString());
}
@@ -218,10 +224,12 @@
public static class GroupableColInfo {
public final double cardRatio;
public final long size;
+ public final int nrDistinct;
- public GroupableColInfo(double lcardRatio, long lsize) {
+ public GroupableColInfo(double lcardRatio, long lsize, int cardinality) {
cardRatio = lcardRatio;
size = lsize;
+ nrDistinct = cardinality;
}
}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroup.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroup.java
index 28db0bc..7a0e5ae 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroup.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroup.java
@@ -28,6 +28,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.DMLRuntimeException;
+import org.apache.sysds.runtime.data.SparseRow;
import org.apache.sysds.runtime.matrix.data.IJV;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
@@ -253,16 +254,46 @@
/**
* Multiply the slice of the matrix that this column group represents by a vector on the right.
*
- * @param vector vector to multiply by (tall vector)
- * @param c accumulator for holding the result
- * @param rl row lower
- * @param ru row upper if the internal SystemML code that performs the multiplication experiences an error
+ * @param vector Vector to multiply by (tall vector)
+ * @param c Accumulator for holding the result
+ * @param rl Row to start at
+ * @param ru Row to stop at
* @param dictVals The dictionary values materialized
*/
public abstract void rightMultByVector(double[] vector, double[] c, int rl, int ru, double[] dictVals);
- public abstract void rightMultByMatrix(double[] matrix, double[] result, int numVals, double[] values, int rl,
- int ru, int vOff);
+ /**
+ * Right multiply by matrix. for which the compressed matrix is on the left and the uncompressed is on the right.
+ * Note that there is no b argument, but the b is aggregated into the values needed for assignment and addition into
+ * output.
+ *
+ * @param preAggregatedB The preAggregated values that is to be put into c
+ * @param c The output matrix
+ * @param thatNrColumns The number of columns in B (before aggregation)
+ * @param rl The row index to start the multiplication from
+ * @param ru The row index to stop the multiplication at
+ * @param cl The column index to start from
+ * @param cu The row index to stop at.
+ */
+ public abstract void rightMultByMatrix(double[] preAggregatedB, double[] c, int thatNrColumns, int rl, int ru,
+ int cl, int cu);
+
+ /**
+ * Sparse right multiply by matrix, for which the compressed matrix is on the left and the uncompressed sparse is on
+ * the right. This call differ from the other right multiply by not having a preAggregation phase.
+ *
+ * This should only be called in very sparse situations.
+ *
+ * @param rows The sparse rows
+ * @param c The output matrix linearized
+ * @param numVals The number of values in the dictionary
+ * @param dictVals The materialized dictionary
+ * @param nrColumns The number of columns in the matrix to multiply with and also in the output
+ * @param rl The row index to start at
+ * @param ru The row index to stop at.
+ */
+ public abstract void rightMultBySparseMatrix(SparseRow[] rows, double[] c, int numVals, double[] dictVals,
+ int nrColumns, int rl, int ru);
/**
* Multiply the slice of the matrix that this column group represents by a row vector on the left (the original
@@ -286,8 +317,7 @@
public abstract void leftMultByRowVector(double[] vector, double[] result, int numVals, double[] values);
/**
- * Multiply the slice of the matrix that this column group represents by a row vector on the left (the original
- * column vector is assumed to be transposed already i.e. its size now is 1xn).
+ * Multiply with a matrix on the left.
*
* @param matrix matrix to left multiply
* @param result matrix block result
@@ -303,6 +333,25 @@
int numCols, int rl, int ru, int vOff);
/**
+ * Multiply with a sparse matrix on the left hand side, and add the values to the output result
+ *
+ * @param spNrVals the Number of sparse values (since the number of indexes does not align with number of
+ * values)
+ * @param indexes the indexes for the sparse values in the given row.
+ * @param sparseV the sparse values.
+ * @param result the linearized output matrix
+ * @param numVals the number of values in the dictionary
+ * @param values the dictionary values materialized
+ * @param numRows the number of rows in the left hand side input matrix (the sparse one)
+ * @param numCols the number of columns in the compression.
+ * @param row the row index of the sparse row to multiply with.
+ * @param MaterializedRow The sparse row materialized (should only be done if needed for the specific type of
+ * ColumnGroup)
+ */
+ public abstract void leftMultBySparseMatrix(int spNrVals, int[] indexes, double[] sparseV, double[] result,
+ int numVals, double[] values, int numRows, int numCols, int row, double[] MaterializedRow);
+
+ /**
* Perform the specified scalar operation directly on the compressed column group, without decompressing individual
* cells if possible.
*
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
index 0f4c8aa..1400bea 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC.java
@@ -22,7 +22,6 @@
import java.util.Arrays;
import java.util.Iterator;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.sysds.runtime.compress.CompressionSettings;
import org.apache.sysds.runtime.compress.utils.ABitmap;
import org.apache.sysds.runtime.functionobjects.Builtin;
@@ -86,7 +85,7 @@
int nnz = 0;
for(int i = 0; i < _numRows; i++) {
int index = getIndex(i);
- if(index != values.length) {
+ if(index < getNumValues()) {
nnz += ((c[i] = values[(index) * ncol + colpos]) != 0) ? 1 : 0;
}
else {
@@ -104,10 +103,9 @@
throw new RuntimeException("Column index " + c + " not in DDC group.");
// get value
- int index = getIndex(r, ix);
- if(index != getNumValues()) {
-
- return _dict.getValue(index);
+ int index = getIndex(r);
+ if(index < getNumValues()) {
+ return _dict.getValue(index * _colIndexes.length + ix);
}
else {
return 0.0;
@@ -150,7 +148,7 @@
for(int rix = rl; rix < ru; rix++) {
int index = getIndex(rix);
- if(index != numVals) {
+ if(index < numVals) {
setandExecute(c, kbuff, kplus2, vals[index], rix * (2 + (mean ? 1 : 0)));
}
}
@@ -158,18 +156,18 @@
@Override
protected void computeRowMxx(double[] c, Builtin builtin, int rl, int ru) {
- final int numVals = getNumValues();
int ncol = getNumCols();
double[] dictionary = getValues();
for(int i = rl; i < ru; i++) {
- int rowIndex = getIndex(i);
- if(rowIndex != numVals) {
- for(int j = 0; j < ncol; j++)
- c[i] = builtin.execute(c[i], dictionary[rowIndex + j]);
- }
- else {
- c[i] = builtin.execute(c[i], 0.0);
+ int index = getIndex(i) * ncol;
+ for(int j = 0; j < ncol; j++) {
+ if(index < dictionary.length) {
+ c[i] = builtin.execute(c[i], dictionary[index + j]);
+ }
+ else {
+ c[i] = builtin.execute(c[i], 0.0);
+ }
}
}
}
@@ -180,14 +178,13 @@
public void postScaling(double[] values, double[] vals, double[] c, int numVals, int i, int totalCols) {
final int ncol = getNumCols();
+ int valOff = 0;
- for(int j = 0; j < ncol; j++) {
- int colIx = _colIndexes[j] + i * totalCols;
- for(int k = 0, valOff = 0; k < numVals; k++, valOff += ncol) {
- double aval = vals[k];
- if(valOff != numVals) {
- c[colIx] += aval * values[valOff + j];
- }
+ for(int k = 0; k < numVals; k++) {
+ double aval = vals[k];
+ for(int j = 0; j < ncol; j++) {
+ int colIx = _colIndexes[j] + i * totalCols;
+ c[colIx] += aval * values[valOff++];
}
}
}
@@ -206,20 +203,13 @@
return counts;
}
-
- @Override
- public void rightMultByMatrix(double[] b, double[] c, int numVals, double[] values, int rl, int ru, int vOff){
- throw new NotImplementedException("Not Implemented");
- // final int numCols = getNumCols();
- }
-
-
@Override
public void leftMultByMatrix(double[] a, double[] c, int numVals, double[] values, int numRows, int numCols, int rl,
int ru, int voff) {
- numVals = (numVals == -1) ? getNumValues() : numVals;
+ numVals = (numVals == -1) ? getNumValues() : numVals;
for(int i = rl, j = voff; i < ru; i++, j++) {
+ int offC = i * numCols;
if(8 * numVals < _numRows) {
// iterative over codes and pre-aggregate inputs per code (guaranteed <=255)
// temporary array also avoids false sharing in multi-threaded environments
@@ -227,13 +217,13 @@
postScaling(values, vals, c, numVals, i, numCols);
}
else {
- for(int k = 0, aOff = j *_numRows; k < _numRows; k++, aOff++) {
+ for(int k = 0, aOff = j * _numRows; k < _numRows; k++, aOff++) {
double aval = a[aOff];
if(aval != 0) {
int valOff = getIndex(k) * _colIndexes.length;
- if(valOff != numVals) {
+ if(valOff < numVals) {
for(int h = 0; h < _colIndexes.length; h++) {
- int colIx = _colIndexes[h] + i * numCols;
+ int colIx = _colIndexes[h] + offC;
c[colIx] += aval * values[valOff + h];
}
}
@@ -244,6 +234,23 @@
}
@Override
+ public void leftMultBySparseMatrix(int spNrVals, int[] indexes, double[] sparseV, double[] c, int numVals,
+ double[] values, int numRows, int numCols, int row, double[] MaterializedRow) {
+ numVals = (numVals == -1) ? getNumValues() : numVals;
+ for(int i = 0; i < spNrVals; i++) {
+ int k = indexes[i];
+ double aval = sparseV[i];
+ int valOff = getIndex(k);
+ if(valOff < numVals) {
+ for(int h = 0; h < _colIndexes.length; h++) {
+ int colIx = _colIndexes[h] + row * numCols;
+ c[colIx] += aval * values[valOff * _colIndexes.length + h];
+ }
+ }
+ }
+ }
+
+ @Override
public void leftMultByRowVector(double[] a, double[] result, int numVals) {
numVals = (numVals == -1) ? getNumValues() : numVals;
double[] values = getValues();
@@ -265,24 +272,17 @@
* @return the pre-aggregated values.
*/
public double[] preAggregate(double[] a, int numVals, int aRows) {
- double[] vals;
+ double[] vals = allocDVector(numVals + 1, true);
if(aRows > 0) {
- vals = allocDVector(numVals, true);
- // int off = _numRows * aRows;
for(int i = 0, off = _numRows * aRows; i < _numRows; i++, off++) {
int index = getIndex(i);
- if(index != numVals) { // Since we know that multiplying with 0 is .. 0 don't begin to aggregate.
- vals[index] += a[off];
- }
+ vals[index] += a[off];
}
}
else {
- vals = allocDVector(numVals, true);
for(int i = 0; i < _numRows; i++) {
int index = getIndex(i);
- if(index != numVals) { // Since we know that multiplying with 0 is .. 0 don't begin to aggregate.
- vals[index] += a[i];
- }
+ vals[index] += a[i];
}
}
return vals;
@@ -290,13 +290,15 @@
@Override
public void leftMultByRowVector(double[] a, double[] c, int numVals, double[] values) {
- // double[] c = result.getDenseBlockValues();
+
numVals = (numVals == -1) ? getNumValues() : numVals;
if(8 * numVals < _numRows) {
// iterative over codes and pre-aggregate inputs per code (guaranteed <=255)
// temporary array also avoids false sharing in multi-threaded environments
+
double[] vals = preAggregate(a, numVals);
+
postScaling(values, vals, c, numVals);
}
else {
@@ -305,7 +307,7 @@
double aval = a[i];
if(aval != 0)
for(int j = 0, valOff = getIndex(i) * _colIndexes.length; j < _colIndexes.length; j++)
- if(valOff != numVals) {
+ if(valOff < numVals) {
c[_colIndexes[j]] += aval * values[valOff + j];
}
}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC1.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC1.java
index 7b401d6..25f45ed 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC1.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC1.java
@@ -25,9 +25,11 @@
import java.util.Arrays;
import java.util.HashMap;
+import org.apache.commons.lang.NotImplementedException;
import org.apache.sysds.runtime.compress.CompressionSettings;
import org.apache.sysds.runtime.compress.utils.ABitmap;
import org.apache.sysds.runtime.compress.utils.LinearAlgebraUtils;
+import org.apache.sysds.runtime.data.SparseRow;
import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
/**
@@ -99,6 +101,23 @@
}
@Override
+ public void rightMultByMatrix(double[] preAggregatedB, double[] c, int thatNrColumns, int rl, int ru, int cl, int cu){
+ LinearAlgebraUtils.vectListAddDDC(preAggregatedB, c, _data, rl, ru, cl, cu, thatNrColumns, getNumValues());
+ }
+
+ @Override
+ public void rightMultBySparseMatrix(SparseRow[] rows, double[] c, int numVals, double[] dictVals, int nrColumns,
+ int rl, int ru) {
+ if(rows.length > 1) {
+ throw new NotImplementedException("Not Implemented CoCoded right Sparse Multiply");
+ }
+ for(int i = 0; i < rows[0].size(); i++) {
+ double[] vals = sparsePreaggValues(numVals, rows[0].values()[i], false, dictVals);
+ LinearAlgebraUtils.vectListAdd(vals, c, _data, rl, ru, rows[0].indexes()[i] * _numRows);
+ }
+ }
+
+ @Override
protected int getIndex(int r) {
return _data[r] & 0xFF;
}
@@ -111,13 +130,13 @@
@Override
protected double getData(int r, double[] values) {
int index = (_data[r] & 0xFF);
- return (index == values.length) ? 0.0 : values[index];
+ return (index >= values.length) ? 0.0 : values[index];
}
@Override
protected double getData(int r, int colIx, double[] values) {
int index = (_data[r] & 0xFF) * getNumCols() + colIx;
- return (index == values.length) ? 0.0 : values[index];
+ return (index >= values.length) ? 0.0 : values[index];
}
@Override
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC2.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC2.java
index 78d2d2b..0e8cf04 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC2.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupDDC2.java
@@ -27,6 +27,7 @@
import org.apache.sysds.runtime.compress.CompressionSettings;
import org.apache.sysds.runtime.compress.utils.ABitmap;
import org.apache.sysds.runtime.compress.utils.LinearAlgebraUtils;
+import org.apache.sysds.runtime.data.SparseRow;
import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
/**
@@ -105,7 +106,8 @@
@Override
protected double getData(int r, int colIx, double[] dictionary) {
- return _dict.getValue(_data[r] * getNumCols() + colIx);
+ int index = _data[r] * getNumCols() + colIx;
+ return (index < dictionary.length) ? dictionary[index] : 0.0;
}
@Override
@@ -121,6 +123,20 @@
}
@Override
+ public void rightMultByMatrix(double[] preAggregatedB, double[] c, int thatNrColumns, int rl, int ru, int cl, int cu){
+ LinearAlgebraUtils.vectListAddDDC(preAggregatedB, c, _data, rl, ru, cl, cu, thatNrColumns,getNumValues());
+ }
+
+ @Override
+ public void rightMultBySparseMatrix(SparseRow[] rows, double[] c, int numVals, double[] dictVals, int nrColumns,
+ int rl, int ru) {
+ for(int i = 0; i < rows[0].size(); i++) {
+ double[] vals = sparsePreaggValues(numVals, rows[0].values()[i], false, dictVals);
+ LinearAlgebraUtils.vectListAdd(vals, c, _data, rl, ru, rows[0].indexes()[i] * _numRows);
+ }
+ }
+
+ @Override
public void write(DataOutput out) throws IOException {
super.write(out);
// write data
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
index 8b55c9c..34ac053 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupFactory.java
@@ -33,6 +33,7 @@
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.BitmapEncoder;
import org.apache.sysds.runtime.compress.CompressionSettings;
+import org.apache.sysds.runtime.compress.cocode.PlanningCoCoder.PartitionerType;
import org.apache.sysds.runtime.compress.colgroup.ColGroup.CompressionType;
import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimatorExact;
@@ -45,6 +46,7 @@
* Factory pattern for constructing ColGroups.
*/
public class ColGroupFactory {
+ // private static final Log LOG = LogFactory.getLog(ColGroupFactory.class.getName());
/**
* The actual compression method, that handles the logic of compressing multiple columns together. This method also
@@ -178,7 +180,8 @@
// Furthermore performance of a compressed representation that does not compress much, is decremental to
// overall performance.
- if(compRatio > 1.0) {
+
+ if(compRatio > 1.0 || compSettings.columnPartitioner == PartitionerType.COST) {
int rlen = compSettings.transposeInput ? in.getNumColumns() : in.getNumRows();
return compress(colIndexes, rlen, ubm, sizeInfo.getBestCompressionType(), compSettings, in);
}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java
index ca7cbf4..8845ce7 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupOLE.java
@@ -19,16 +19,15 @@
package org.apache.sysds.runtime.compress.colgroup;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.commons.lang.NotImplementedException;
+import org.apache.sysds.runtime.DMLCompressionException;
import org.apache.sysds.runtime.compress.CompressionSettings;
import org.apache.sysds.runtime.compress.utils.ABitmap;
import org.apache.sysds.runtime.compress.utils.LinearAlgebraUtils;
+import org.apache.sysds.runtime.data.SparseRow;
import org.apache.sysds.runtime.functionobjects.Builtin;
import org.apache.sysds.runtime.functionobjects.KahanFunction;
import org.apache.sysds.runtime.functionobjects.KahanPlus;
@@ -43,8 +42,6 @@
public class ColGroupOLE extends ColGroupOffset {
private static final long serialVersionUID = -9157676271360528008L;
- protected int[] _skipList;
-
protected ColGroupOLE() {
super();
}
@@ -59,7 +56,6 @@
*/
protected ColGroupOLE(int[] colIndices, int numRows, ABitmap ubm, CompressionSettings cs) {
super(colIndices, numRows, ubm, cs);
-
// compress the bitmaps
final int numVals = ubm.getNumValues();
char[][] lbitmaps = new char[numVals][];
@@ -68,27 +64,9 @@
lbitmaps[i] = genOffsetBitmap(ubm.getOffsetsList(i).extractValues(), ubm.getNumOffsets(i));
totalLen += lbitmaps[i].length;
}
-
// compact bitmaps to linearized representation
createCompressedBitmaps(numVals, totalLen, lbitmaps);
- _skipList = null;
- if(cs.skipList && numRows > 2 * CompressionSettings.BITMAP_BLOCK_SZ) {
- _skipList = new int[numVals];
- int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
- // _skipList = new int[numVals];
- int rl = (_numRows / 2 / blksz) * blksz;
- for(int k = 0; k < numVals; k++) {
- int boff = _ptr[k];
- int blen = len(k);
- int bix = 0;
- for(int i = 0; i < rl && bix < blen; i += blksz) {
- bix += _data[boff + bix] + 1;
- }
- _skipList[k] = bix;
- }
- }
-
}
protected ColGroupOLE(int[] colIndices, int numRows, boolean zeros, ADictionary dict, char[] bitmaps,
@@ -222,13 +200,9 @@
// Arrays.fill(counts, 0, numVals, 0);
int sum = 0;
for(int k = 0; k < numVals; k++) {
- int boff = _ptr[k];
int blen = len(k);
- // iterate over bitmap blocks and count partial lengths
- int count = 0;
- for(int bix = 0; bix < blen; bix += _data[boff + bix] + 1) {
- count += _data[boff + bix];
- }
+ int blocks = _numRows / CompressionSettings.BITMAP_BLOCK_SZ + 1;
+ int count = blen - blocks;
sum += count;
counts[k] = count;
}
@@ -301,7 +275,10 @@
final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
final int numVals = getNumValues();
- if(numVals > 1 && _numRows > blksz) {
+ if(rl % blksz != 0)
+ throw new DMLCompressionException("All blocks should be starting at block segments for OLE");
+
+ if(numVals > 1 && _numRows > blksz * 2) {
// since single segment scans already exceed typical L2 cache sizes
// and because there is some overhead associated with blocking, the
// best configuration aligns with L3 cache size (x*vcores*64K*8B < L3)
@@ -328,7 +305,7 @@
int pos = boff + bix + 1;
// compute partial results
- LinearAlgebraUtils.vectAdd(val, c, _data, pos, ii, Math.min(len, ru));
+ LinearAlgebraUtils.vectAdd(val, c, _data, pos, ii, len);
bix += len + 1;
}
@@ -370,9 +347,95 @@
}
@Override
- public void rightMultByMatrix(double[] matrix, double[] result, int numVals, double[] values, int rl, int ru,
- int vOff) {
- throw new NotImplementedException("Not Implemented");
+ public void rightMultByMatrix(double[] preAggregatedB, double[] c, int thatNrColumns, int rl, int ru, int cl,
+ int cu) {
+
+ final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
+ if(rl % blksz != 0)
+ throw new DMLCompressionException("All blocks should be starting at block segments for OLE");
+ final int nrVals = getNumValues();
+ for(int k = 0; k < nrVals; k++) {
+ // prepare value-to-add for entire value bitmap
+ int boff = _ptr[k];
+ int blen = len(k);
+
+ // iterate over bitmap blocks and add values
+ int bix = skipScanVal(k, rl);
+ ;
+ int off = rl;
+ int slen = 0;
+ // compute partial results
+ for(; bix < blen & off < ru; bix += slen + 1, off += blksz) {
+ slen = _data[boff + bix];
+ for(int blckIx = 1; blckIx <= slen; blckIx++) {
+ int rowIdx = (_data[boff + bix + blckIx] + off) * thatNrColumns;
+ addV(c, preAggregatedB, cl, cu, rowIdx, k);
+ }
+ }
+
+ }
+ }
+
+ private static void addV(final double[] c, final double[] preAggregatedB, final int cl, final int cu,
+ final int rowIdx, final int k) {
+ final int bn = (cu - cl % 8);
+ int n = k * (cu - cl);
+ for(int i = cl + rowIdx; i < cl + bn + rowIdx; i++, n++) {
+ c[i] += preAggregatedB[n];
+ }
+
+ for(int i = cl + bn + rowIdx; i < cu + rowIdx; i += 8, n += 8) {
+ c[i + 0] += preAggregatedB[n + 0];
+ c[i + 1] += preAggregatedB[n + 1];
+ c[i + 2] += preAggregatedB[n + 2];
+ c[i + 3] += preAggregatedB[n + 3];
+ c[i + 4] += preAggregatedB[n + 4];
+ c[i + 5] += preAggregatedB[n + 5];
+ c[i + 6] += preAggregatedB[n + 6];
+ c[i + 7] += preAggregatedB[n + 7];
+ }
+ }
+
+ @Override
+ public void rightMultBySparseMatrix(SparseRow[] rows, double[] c, int numVals, double[] dictVals, int nrColumns,
+ int rl, int ru) {
+ final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
+ if(rows.length > 1) {
+ throw new NotImplementedException("Not Implemented CoCoded right Sparse Multiply");
+ }
+
+ for(int k = 0; k < numVals; k++) {
+ // prepare value-to-add for entire value bitmap
+ int boff = _ptr[k];
+ int blen = len(k);
+ for(int i = 0; i < rows[0].size(); i++) {
+ int column = rows[0].indexes()[i];
+ double val = sumValuesSparse(k, rows, dictVals, i);
+
+ // iterate over bitmap blocks and add values
+ if(val != 0) {
+ int bix = 0;
+ int off = 0 + column * _numRows;
+ int slen = -1;
+
+ // scan to beginning offset if necessary
+ if(rl > 0) {
+ for(; bix < blen & off < rl + column * _numRows; bix += slen + 1, off += blksz) {
+ slen = _data[boff + bix];
+ }
+ }
+
+ // compute partial results
+ for(; bix < blen & off < ru + column * _numRows; bix += slen + 1, off += blksz) {
+ slen = _data[boff + bix];
+ for(int blckIx = 1; blckIx <= slen; blckIx++) {
+ c[off + _data[boff + bix + blckIx]] += val;
+ }
+ }
+ }
+ }
+ }
+
}
@Override
@@ -450,7 +513,6 @@
public void leftMultByMatrix(double[] a, double[] c, int numVals, double[] values, int numRows, int numCols, int rl,
int ru, int voff) {
final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
- final int thisNumCols = getNumCols();
if(numVals >= 1 && _numRows > blksz) {
@@ -460,10 +522,10 @@
// step 1: prepare position and value arrays
// current pos per OLs / output values
- int[] apos = allocIVector(numVals, true);
- double[] cvals = allocDVector(numVals, true);
for(int i = rl, off = voff * _numRows; i < ru; i++, off += _numRows) {
+ int[] apos = allocIVector(numVals, true);
+ double[] cvals = allocDVector(numVals, true);
// step 2: cache conscious matrix-vector via horizontal scans
for(int ai = 0; ai < _numRows; ai += blksz2) {
int aimax = Math.min(ai + blksz2, _numRows);
@@ -472,7 +534,7 @@
for(int k = 0; k < numVals; k++) {
int boff = _ptr[k];
int blen = len(k);
- int bix = apos[k] + off;
+ int bix = apos[k];
double vsum = 0;
for(int ii = ai; ii < aimax && bix < blen; ii += blksz) {
@@ -490,10 +552,11 @@
}
}
+ int offC = i * numCols;
// step 3: scale partial results by values and write to global output
- for(int k = 0, valOff = 0; k < numVals; k++, valOff += thisNumCols)
- for(int j = 0; j < thisNumCols; j++) {
- int colIx = _colIndexes[j] + i * numCols;
+ for(int k = 0, valOff = 0; k < numVals; k++, valOff += _colIndexes.length)
+ for(int j = 0; j < _colIndexes.length; j++) {
+ int colIx = _colIndexes[j] + offC;
c[colIx] += cvals[k] * values[valOff + j];
}
}
@@ -501,7 +564,7 @@
else {
for(int i = rl, offR = voff * _numRows; i < ru; i++, offR += _numRows) {
- for(int k = 0, valOff = 0; k < numVals; k++, valOff += thisNumCols) {
+ for(int k = 0, valOff = 0; k < numVals; k++, valOff += _colIndexes.length) {
int boff = _ptr[k];
int blen = len(k);
@@ -512,8 +575,9 @@
// scale partial results by values and write results
- for(int j = 0; j < thisNumCols; j++) {
- int colIx = _colIndexes[j] + i * numCols;
+ int offC = i * numCols;
+ for(int j = 0; j < _colIndexes.length; j++) {
+ int colIx = _colIndexes[j] + offC;
c[colIx] += vsum * values[valOff + j];
}
}
@@ -521,10 +585,86 @@
}
}
- // @Override
- // public void leftMultByRowVector(double[] a, double[] c, int numVals, byte[] values) {
- // throw new NotImplementedException("Not Implemented Byte fore OLE");
- // }
+ @Override
+ public void leftMultBySparseMatrix(int spNrVals, int[] indexes, double[] sparseV, double[] c, int numVals,
+ double[] values, int numRows, int numCols, int row, double[] tmpA) {
+ final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
+
+ if(numVals >= 1 && _numRows > blksz) {
+
+ // cache blocking config (see matrix-vector mult for explanation)
+ final int blksz2 = 2 * CompressionSettings.BITMAP_BLOCK_SZ;
+
+ // step 1: prepare position and value arrays
+ int[] apos = allocIVector(numVals, true);
+ double[] cvals = allocDVector(numVals, true);
+ // step 2: cache conscious matrix-vector via horizontal scans
+ int pI = 0;
+ for(int ai = 0; ai < _numRows; ai += blksz2) {
+ int aimax = Math.min(ai + blksz2, _numRows);
+
+ for(int i = 0; i < blksz2; i++) {
+ tmpA[i] = 0;
+ }
+
+ for(; pI < spNrVals && indexes[pI] < aimax; pI++) {
+ if(indexes[pI] >= ai)
+ tmpA[indexes[pI] - ai] = sparseV[pI];
+ }
+
+ // horizontal segment scan, incl pos maintenance
+ for(int k = 0; k < numVals; k++) {
+ int boff = _ptr[k];
+ int blen = len(k);
+ int bix = apos[k];
+ double vsum = 0;
+ for(int ii = ai; ii < aimax && bix < blen; ii += blksz) {
+ int len = _data[boff + bix];
+ int pos = boff + bix + 1;
+ int blockId = (ii / blksz) % 2;
+ vsum += LinearAlgebraUtils.vectSum(tmpA, _data, blockId * blksz, pos, len);
+ bix += len + 1;
+ }
+
+ apos[k] = bix;
+ cvals[k] += vsum;
+ }
+ }
+
+ int offC = row * numCols;
+ // step 3: scale partial results by values and write to global output
+ for(int k = 0, valOff = 0; k < numVals; k++, valOff += _colIndexes.length)
+ for(int j = 0; j < _colIndexes.length; j++) {
+ int colIx = _colIndexes[j] + offC;
+ c[colIx] += cvals[k] * values[valOff + j];
+ }
+
+ }
+ else {
+ for(int k = 0, valOff = 0; k < numVals; k++, valOff += _colIndexes.length) {
+ int boff = _ptr[k];
+ int blen = len(k);
+ double vsum = 0;
+ int pI = 0;
+ for(int bix = 0, off = 0; bix < blen; bix += _data[boff + bix] + 1, off += blksz) {
+ // blockId = off / blksz;
+ for(int i = 0; i < blksz; i++) {
+ tmpA[i] = 0;
+ }
+ for(; pI < spNrVals && indexes[pI] < off + blksz; pI++) {
+ if(indexes[pI] >= off)
+ tmpA[indexes[pI] - off] = sparseV[pI];
+ }
+ vsum += LinearAlgebraUtils.vectSum(tmpA, _data, 0, boff + bix + 1, _data[boff + bix]);
+ }
+
+ for(int j = 0; j < _colIndexes.length; j++) {
+ int Voff = _colIndexes[j] + row * numCols;
+ c[Voff] += vsum * values[valOff + j];
+ }
+ }
+ }
+ }
@Override
protected final void computeSum(double[] c, KahanFunction kplus) {
@@ -714,13 +854,11 @@
final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
if(rl > 0) { // rl aligned with blksz
- int rskip = (_numRows / 2 / blksz) * blksz;
-
for(int k = 0; k < numVals; k++) {
int boff = _ptr[k];
int blen = len(k);
- int start = (rl >= rskip) ? rskip : 0;
- int bix = (rl >= rskip) ? _skipList[k] : 0;
+ int start = 0;
+ int bix = 0;
for(int i = start; i < rl && bix < blen; i += blksz) {
bix += _data[boff + bix] + 1;
}
@@ -735,11 +873,10 @@
final int blksz = CompressionSettings.BITMAP_BLOCK_SZ;
if(rl > 0) { // rl aligned with blksz
- int rskip = (_numRows / 2 / blksz) * blksz;
int boff = _ptr[k];
int blen = len(k);
- int start = (rl >= rskip) ? rskip : 0;
- int bix = (rl >= rskip) ? _skipList[k] : 0;
+ int start = 0;
+ int bix = 0;
for(int i = start; i < rl && bix < blen; i += blksz) {
bix += _data[boff + bix] + 1;
}
@@ -750,48 +887,6 @@
}
@Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- boolean skiplistNull = in.readBoolean();
- if(!skiplistNull) {
- _skipList = new int[in.readInt()];
- for(int i = 0; i < _skipList.length; i++) {
- _skipList[i] = in.readInt();
- }
- }
- else {
- _skipList = null;
- }
-
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- if(_skipList != null) {
- out.writeBoolean(false);
- out.writeInt(_skipList.length);
- for(int i = 0; i < _skipList.length; i++) {
- out.writeInt(_skipList[i]);
- }
- }
- else {
- out.writeBoolean(true);
- }
- }
-
- @Override
- public long getExactSizeOnDisk() {
- long ret = super.getExactSizeOnDisk();
- ret += 1; // in case skip list is null.
- if(_skipList != null) {
- ret += 4; // skiplist length
- ret += 4 * _skipList.length;
- }
- return ret;
- }
-
- @Override
public Iterator<Integer> getIterator(int k) {
return new OLEValueIterator(k, 0, _numRows);
}
@@ -810,14 +905,6 @@
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(super.toString());
- if(_skipList != null) {
- sb.append(String.format("\n%15s%5d ", "SkipList:", this._skipList.length));
- sb.append(Arrays.toString(this._skipList));
- }
- else {
- sb.append("skiplist empty");
- }
-
return sb.toString();
}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java
index a61b26c..1ce23d5 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupRLE.java
@@ -28,6 +28,7 @@
import org.apache.sysds.runtime.compress.CompressionSettings;
import org.apache.sysds.runtime.compress.utils.ABitmap;
import org.apache.sysds.runtime.compress.utils.LinearAlgebraUtils;
+import org.apache.sysds.runtime.data.SparseRow;
import org.apache.sysds.runtime.functionobjects.Builtin;
import org.apache.sysds.runtime.functionobjects.KahanFunction;
import org.apache.sysds.runtime.functionobjects.KahanPlus;
@@ -59,6 +60,7 @@
final int numVals = ubm.getNumValues();
char[][] lbitmaps = new char[numVals][];
int totalLen = 0;
+
for(int k = 0; k < numVals; k++) {
lbitmaps[k] = genRLEBitmap(ubm.getOffsetsList(k).extractValues(), ubm.getNumOffsets(k));
totalLen += lbitmaps[k].length;
@@ -261,7 +263,7 @@
}
@Override
- public void rightMultByVector(double[] b , double[] c, int rl, int ru, double[] dictVals) {
+ public void rightMultByVector(double[] b, double[] c, int rl, int ru, double[] dictVals) {
final int numVals = getNumValues();
if(numVals >= 1 && _numRows > CompressionSettings.BITMAP_BLOCK_SZ) {
@@ -273,12 +275,12 @@
// step 1: prepare position and value arrays
// current pos / values per RLE list
- int[] astart = new int[numVals];
- int[] apos = skipScan(numVals, rl, astart);
- double[] aval = preaggValues(numVals, b,dictVals);
// step 2: cache conscious matrix-vector via horizontal scans
for(int bi = rl; bi < ru; bi += blksz) {
+ int[] astart = new int[numVals];
+ int[] apos = skipScan(numVals, rl, astart);
+ double[] aval = preaggValues(numVals, b, dictVals);
int bimax = Math.min(bi + blksz, ru);
// horizontal segment scan, incl pos maintenance
@@ -346,8 +348,89 @@
}
@Override
- public void rightMultByMatrix(double[] matrix, double[] result, int numVals, double[] values, int rl, int ru, int vOff){
- throw new NotImplementedException("Not Implemented");
+ public void rightMultByMatrix(double[] preAggregatedB, double[] c, int thatNrColumns, int rl, int ru, int cl,
+ int cu) {
+ final int nrVals = getNumValues();
+ for(int k = 0; k < nrVals; k++) {
+ int boff = _ptr[k];
+ int blen = len(k);
+ int bix = 0;
+ int start = 0;
+
+ // scan to beginning offset if necessary
+ if(rl > 0) { // rl aligned with blksz
+ while(bix < blen) {
+ int lstart = _data[boff + bix]; // start
+ int llen = _data[boff + bix + 1]; // len
+ if(start + lstart + llen >= rl)
+ break;
+ start += lstart + llen;
+ bix += 2;
+ }
+ }
+ // compute partial results, not aligned
+ while(bix < blen) {
+ int lstart = _data[boff + bix];
+ int llen = _data[boff + bix + 1];
+ LinearAlgebraUtils.vectListAdd(preAggregatedB,
+ c,
+ Math.max(rl, start + lstart),
+ Math.min(start + lstart + llen, ru),
+ cl,
+ cu,
+ thatNrColumns,
+ k * (cu - cl));
+ if(start + lstart + llen >= ru)
+ break;
+ start += lstart + llen;
+ bix += 2;
+ }
+ }
+ }
+
+ @Override
+ public void rightMultBySparseMatrix(SparseRow[] rows, double[] c, int numVals, double[] dictVals, int nrColumns,
+ int rl, int ru) {
+ if(rows.length > 1) {
+ throw new NotImplementedException("Not Implemented CoCoded right Sparse Multiply");
+ }
+ for(int k = 0; k < numVals; k++) {
+ int boff = _ptr[k];
+ int blen = len(k);
+ for(int i = 0; i < rows[0].size(); i++) {
+ int column = rows[0].indexes()[i];
+ double val = sumValuesSparse(k, rows, dictVals, i);
+ int bix = 0;
+ int start = 0 + column * _numRows;
+
+ // scan to beginning offset if necessary
+ if(rl > 0) { // rl aligned with blksz
+ while(bix < blen) {
+ int lstart = _data[boff + bix]; // start
+ int llen = _data[boff + bix + 1]; // len
+ if(start + lstart + llen >= rl + column * _numRows)
+ break;
+ start += lstart + llen;
+ bix += 2;
+ }
+ }
+
+ // compute partial results, not aligned
+ while(bix < blen) {
+ int lstart = _data[boff + bix];
+ int llen = _data[boff + bix + 1];
+ LinearAlgebraUtils.vectAdd(val,
+ c,
+ Math.max(rl + column * _numRows, start + lstart),
+ Math.min(start + lstart + llen, ru + column * _numRows) -
+ Math.max(rl + column * _numRows, start + lstart));
+ if(start + lstart + llen >= ru + column * _numRows)
+ break;
+ start += lstart + llen;
+ bix += 2;
+ }
+ }
+ }
}
@Override
@@ -425,18 +508,15 @@
}
@Override
- public void leftMultByMatrix(double[] a, double[] c, int numVals, double[] values, int numRows, int numCols, int rl,
- int ru, int voff) {
- // throw new NotImplementedException();
- final int thisNumCols = getNumCols();
-
+ public void leftMultByMatrix(final double[] a, final double[] c, final int numVals, final double[] values,
+ final int numRows, final int numCols, int rl, final int ru, final int vOff) {
+
if(numVals >= 1 && _numRows > CompressionSettings.BITMAP_BLOCK_SZ) {
final int blksz = 2 * CompressionSettings.BITMAP_BLOCK_SZ;
- // double[] aRow = new double[a.length / numRows];
// step 1: prepare position and value arrays
int[] astart = new int[numVals];
- for(int i = rl, off = voff * _numRows; i < ru; i++, off += _numRows) {
+ for(int i = rl, off = vOff * _numRows; i < ru; i++, off += _numRows) {
// System.arraycopy(a, (a.length / numRows) * i, aRow, 0, a.length / numRows);
// current pos per OLs / output values
int[] apos = allocIVector(numVals, true);
@@ -454,7 +534,7 @@
int start = astart[k];
// compute partial results, not aligned
- while(bix < blen & start + off < aimax) {
+ while(bix < blen & start < aimax) {
start += _data[boff + bix];
int len = _data[boff + bix + 1];
cvals[k] += LinearAlgebraUtils.vectSum(a, start + off, len);
@@ -466,20 +546,24 @@
astart[k] = start;
}
}
-
+ int offC = i * numCols;
// step 3: scale partial results by values and write to global output
- for(int k = 0, valOff = 0; k < numVals; k++, valOff += thisNumCols)
- for(int j = 0; j < thisNumCols; j++){
-
- int colIx = _colIndexes[j] + i * numCols;
- c[colIx] += cvals[k] * values[valOff + j];
+ int valOff = 0;
+ for(int k = 0; k < numVals; k++) {
+ for(int j = 0; j < _colIndexes.length; j++) {
+ int colIx = _colIndexes[j] + offC;
+ c[colIx] += cvals[k] * values[valOff++];
}
+ astart[k] = 0;
+ }
}
}
else {
// iterate over all values and their bitmaps
- for(int i = rl, off = voff * _numRows; i < ru; i++, off += _numRows) {
- for(int k = 0, valOff = 0; k < numVals; k++, valOff += thisNumCols) {
+ for(int i = rl, off = vOff * _numRows; i < ru; i++, off += _numRows) {
+ int offC = i * numCols;
+ int valOff = 0;
+ for(int k = 0; k < numVals; k++) {
int boff = _ptr[k];
int blen = len(k);
@@ -492,10 +576,10 @@
curRunEnd = curRunStartOff + curRunLen;
}
- for(int j = 0; j < thisNumCols; j++) {
- int colIx = _colIndexes[j] + i * numCols;
+ for(int j = 0; j < _colIndexes.length; j++) {
+ int colIx = _colIndexes[j] + offC;
// scale partial results by values and write results
- c[colIx] += vsum * values[valOff + j];
+ c[colIx] += vsum * values[valOff++];
}
}
}
@@ -503,6 +587,41 @@
}
@Override
+ public void leftMultBySparseMatrix(int spNrVals, int[] indexes, double[] sparseV, double[] c, int numVals,
+ double[] values, int numRows, int numCols, int row, double[] MaterializedRow) {
+ for(int k = 0, valOff = 0; k < numVals; k++, valOff += _colIndexes.length) {
+ int boff = _ptr[k];
+ int blen = len(k);
+
+ double vsum = 0;
+ int pointerIndexes = 0;
+ int curRunEnd = 0;
+ for(int bix = 0; bix < blen; bix += 2) {
+ int curRunStartOff = curRunEnd + _data[boff + bix];
+ int curRunLen = _data[boff + bix + 1];
+ curRunEnd = curRunStartOff + curRunLen;
+ while(pointerIndexes < spNrVals && indexes[pointerIndexes] < curRunStartOff) {
+ pointerIndexes++;
+ }
+ while(pointerIndexes != spNrVals && indexes[pointerIndexes] >= curRunStartOff &&
+ indexes[pointerIndexes] < curRunEnd) {
+ vsum += sparseV[pointerIndexes];
+ pointerIndexes++;
+ }
+ if(pointerIndexes == spNrVals) {
+ break;
+ }
+ }
+
+ for(int j = 0; j < _colIndexes.length; j++) {
+ int Voff = _colIndexes[j] + row * numCols;
+ c[Voff] += vsum * values[valOff + j];
+ }
+ }
+
+ }
+
+ @Override
public ColGroup scalarOperation(ScalarOperator op) {
double val0 = op.executeScalar(0);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
index 6dceb68..47c1f0f 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupUncompressed.java
@@ -31,6 +31,7 @@
import org.apache.sysds.runtime.compress.CompressionSettings;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.data.SparseBlock.Type;
+import org.apache.sysds.runtime.data.SparseRow;
import org.apache.sysds.runtime.functionobjects.ReduceRow;
import org.apache.sysds.runtime.matrix.data.IJV;
import org.apache.sysds.runtime.matrix.data.LibMatrixAgg;
@@ -270,28 +271,37 @@
shortVector.recomputeNonZeros();
// Multiply the selected columns by the appropriate parts of the vector
- LibMatrixMult.matrixMult(_data, shortVector, result,rl,ru);
+ LibMatrixMult.matrixMult(_data, shortVector, result, rl, ru);
}
- public void rightMultByMatrix(MatrixBlock vector, MatrixBlock result, int rl, int ru) {
+ public void rightMultByMatrix(MatrixBlock matrix, MatrixBlock result, int rl, int ru) {
// Pull out the relevant rows of the vector
- // int clen = _colIndexes.length;
+
+ int clen = _colIndexes.length;
+ MatrixBlock subMatrix = new MatrixBlock(clen, matrix.getNumColumns(), false);
+ subMatrix.allocateDenseBlock();
+ double[] b = subMatrix.getDenseBlockValues();
+
+ for(int colIx = 0; colIx < clen; colIx++){
+ int row = _colIndexes[colIx];
+ for(int col = 0; col < matrix.getNumColumns(); col++)
+ b[colIx * matrix.getNumColumns() + col] = matrix.quickGetValue(row, col);
+ }
- // MatrixBlock subMatrix = new MatrixBlock(clen, vector.getNumColumns(), false);
- // subMatrix.allocateDenseBlock();
- // double[] b = subMatrix.getDenseBlockValues();
- // TODO Fix, to copy correctly
- throw new NotImplementedException("Dense right block uncompressed column multiplication not implemented yet.");
- // for(int colIx = 0; colIx < clen; colIx++)
- // b[colIx] = vector.quickGetValue(_colIndexes[colIx], 0);
- // subMatrix.recomputeNonZeros();
+ subMatrix.setNonZeros(clen * matrix.getNumColumns());
// // Multiply the selected columns by the appropriate parts of the vector
- // LibMatrixMult.matrixMult(_data, subMatrix, result);
+ LibMatrixMult.matrixMult(_data, subMatrix, result);
+ }
+
+ public void rightMultByMatrix(double[] preAggregatedB, double[] c, int thatNrColumns, int rl, int ru, int cl,
+ int cu) {
+ throw new NotImplementedException("Should not be called use other matrix function for uncompressed columns");
}
@Override
- public void rightMultByMatrix(double[] matrix, double[] result, int numVals, double[] values, int rl, int ru, int vOff){
+ public void rightMultBySparseMatrix(SparseRow[] rows, double[] c, int numVals, double[] dictVals, int nrColumns,
+ int rl, int ru) {
throw new NotImplementedException("Should not be called use other matrix function for uncompressed columns");
}
@@ -306,10 +316,16 @@
}
@Override
- public void leftMultByMatrix(double[] vector, double[] c, int numVals, double[] values, int numRows, int numCols, int rl, int ru, int vOff) {
+ public void leftMultByMatrix(double[] vector, double[] c, int numVals, double[] values, int numRows, int numCols,
+ int rl, int ru, int vOff) {
throw new NotImplementedException("Should not be called use other matrix function for uncompressed columns");
}
+ @Override
+ public void leftMultBySparseMatrix(int spNrVals, int[] indexes, double[] sparseV, double[] c, int numVals,
+ double[] values, int numRows, int numCols, int row, double[] MaterializedRow) {
+ throw new NotImplementedException("Should not be called use other matrix function for uncompressed columns");
+ }
public void leftMultByMatrix(MatrixBlock matrix, MatrixBlock result) {
MatrixBlock pret = new MatrixBlock(matrix.getNumRows(), _colIndexes.length, false);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java
index 2a76948..101308e 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/ColGroupValue.java
@@ -29,6 +29,8 @@
import org.apache.sysds.runtime.compress.utils.ABitmap;
import org.apache.sysds.runtime.compress.utils.Bitmap;
import org.apache.sysds.runtime.compress.utils.BitmapLossy;
+import org.apache.sysds.runtime.data.SparseBlock;
+import org.apache.sysds.runtime.data.SparseRow;
import org.apache.sysds.runtime.functionobjects.Builtin;
import org.apache.sysds.runtime.functionobjects.Builtin.BuiltinCode;
import org.apache.sysds.runtime.functionobjects.KahanFunction;
@@ -115,7 +117,7 @@
}
public byte[] getByteValues() {
- return ((QDictionary)_dict).getValuesByte();
+ return ((QDictionary) _dict).getValuesByte();
}
@Override
@@ -189,25 +191,124 @@
return val;
}
- protected final double[] preaggValues(int numVals, double[] b, double[] dictVals) {
- return preaggValues(numVals, b, false, dictVals);
+ protected final double sumValues(int valIx, double[] b, double[] dictVals, int off) {
+ final int numCols = getNumCols();
+ final int valOff = valIx * numCols;
+ double val = 0;
+ for(int i = 0; i < numCols; i++)
+ val += dictVals[valOff + i] * b[_colIndexes[i] + off];
+ return val;
}
- protected final double[] preaggValues(int numVals, double[] b, boolean allocNew, double[] dictVals) {
+ protected final double sumValuesSparse(int valIx, SparseRow[] rows, double[] dictVals, int rowsIndex) {
+ final int numCols = getNumCols();
+ final int valOff = valIx * numCols;
+ double val = 0;
+ for(int i = 0; i < numCols; i++) {
+ val += dictVals[valOff + i] * rows[i].values()[rowsIndex];
+ }
+ return val;
+ }
+
+ protected final double[] preaggValues(int numVals, double[] b, double[] dictVals) {
+ return preaggValues(numVals, b, false, dictVals, 0);
+ }
+
+ protected final double[] preaggValues(int numVals, double[] b, double[] dictVals, int off) {
+ return preaggValues(numVals, b, false, dictVals, off);
+ }
+
+ protected final double[] preaggValues(int numVals, double[] b, boolean allocNew, double[] dictVals, int off) {
// + 1 to enable containing a zero value. which we have added at the length of the arrays index.
- double[] ret = allocNew ? new double[numVals +1 ] : allocDVector(numVals +1, false);
- if(_colIndexes.length == 1){
+ double[] ret = allocNew ? new double[numVals + 1] : allocDVector(numVals + 1, true);
+
+ if(_colIndexes.length == 1) {
for(int k = 0; k < numVals; k++)
- ret[k] = dictVals[k] * b[_colIndexes[0]];
- }else{
+ ret[k] = dictVals[k] * b[_colIndexes[0] + off];
+ }
+ else {
for(int k = 0; k < numVals; k++)
- ret[k] = sumValues(k, b, dictVals);
+ ret[k] = sumValues(k, b, dictVals, off);
}
return ret;
}
/**
+ * Aggregates a double array, that contains the values to add to the output matrix.
+ *
+ * Used in right mult by dense matrix
+ *
+ * @param numVals The number of values contained in the dictionary.
+ * @param b The matrix to multiply with
+ * @param dictVals The values contained in the dictionary materialized as doubles
+ * @param cl Lower column index to aggregate from
+ * @param cu Upper column index to aggregate to
+ * @param cut The total number of columns in b.
+ * @return The aggregated matrix output. Note this has to be mapped to the output matrix.
+ */
+ public double[] preaggValues(final int numVals, final double[] b, double[] dictVals, final int cl, final int cu,
+ final int cut) {
+
+ final int retRows = (cu - cl);
+ final int retCols = (numVals);
+ final double[] ret = allocDVector(retCols * retRows, true);
+ for(int k = 0, off = 0; k < numVals * _colIndexes.length; k += _colIndexes.length, off += retRows) {
+ for(int h = 0; h < _colIndexes.length; h++) {
+ int idb = _colIndexes[h] * cut;
+ double v = dictVals[k + h];
+ // TODO: Test if filtering out 0 here is beneficial.
+ // TODO: utilize dictionary quantisation here and dont materialize dictVals beforehand.
+ for(int i = cl, n = off; i < cu; i++, n += 1) {
+ ret[n] += v * b[idb + i];
+ }
+
+ }
+ }
+
+ return ret;
+ }
+
+ public double[] preaggValues(final int numVals, final SparseBlock b, double[] dictVals, final int cl, final int cu,
+ final int cut) {
+
+ final int retRows = (cu - cl);
+ final int retCols = (numVals);
+ final double[] ret = allocDVector(retCols * retRows, true);
+ for(int h = 0; h < _colIndexes.length; h++) {
+ SparseRow row = b.get(_colIndexes[h]);
+ // SparseRow row = b[_colIndexes[h]];
+ for(int i = 0; i < row.size(); i++) {
+ double v = row.values()[i];
+ for(int k = h, off = row.indexes()[i];
+ k < numVals * _colIndexes.length;
+ k += _colIndexes.length, off += retRows) {
+ ret[off] += dictVals[k] * v;
+ }
+ }
+ }
+ return ret;
+ }
+
+ protected final double[] preaggValue(int k, double[] b, double[] dictVals, int cl, int cu, int cut) {
+ double[] ret = allocDVector(cu - cl, true);
+ for(int h = 0; h < _colIndexes.length; h++) {
+ for(int i = cl, n = 0; i < cu; i++, n++) {
+ ret[n] = dictVals[k + h] * b[_colIndexes[h] * cut + i];
+ }
+ }
+ return ret;
+ }
+
+ protected final double[] sparsePreaggValues(int numVals, double v, boolean allocNew, double[] dictVals) {
+ double[] ret = allocNew ? new double[numVals + 1] : allocDVector(numVals + 1, true);
+
+ for(int k = 0; k < numVals; k++)
+ ret[k] = dictVals[k] * v;
+ return ret;
+ }
+
+ /**
* Compute the Max or other equivalent operations.
*
* NOTE: Shared across OLE/RLE/DDC because value-only computation.
@@ -232,8 +333,11 @@
*/
protected void computeColMxx(double[] c, Builtin builtin) {
if(_zeros) {
- for(int x = 0; x < _colIndexes.length; x++) {
- c[_colIndexes[x]] = builtin.execute(c[_colIndexes[x]], 0);
+ if(_colIndexes.length == 1) {
+
+ for(int x = 0; x < _colIndexes.length; x++) {
+ c[_colIndexes[x]] = builtin.execute(c[_colIndexes[x]], 0);
+ }
}
}
_dict.aggregateCols(c, builtin, _colIndexes);
@@ -273,10 +377,12 @@
@Override
public void unaryAggregateOperations(AggregateUnaryOperator op, double[] c, int rl, int ru) {
// sum and sumsq (reduceall/reducerow over tuples and counts)
- if(op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof KahanPlusSq || op.aggOp.increOp.fn instanceof Mean) {
- KahanFunction kplus = (op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof Mean) ? KahanPlus
- .getKahanPlusFnObject() : KahanPlusSq.getKahanPlusSqFnObject();
- boolean mean = op.aggOp.increOp.fn instanceof Mean;
+ if(op.aggOp.increOp.fn instanceof KahanPlus || op.aggOp.increOp.fn instanceof KahanPlusSq ||
+ op.aggOp.increOp.fn instanceof Mean) {
+ KahanFunction kplus = (op.aggOp.increOp.fn instanceof KahanPlus ||
+ op.aggOp.increOp.fn instanceof Mean) ? KahanPlus
+ .getKahanPlusFnObject() : KahanPlusSq.getKahanPlusSqFnObject();
+ boolean mean = op.aggOp.increOp.fn instanceof Mean;
if(op.indexFn instanceof ReduceAll)
computeSum(c, kplus);
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/Dictionary.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/Dictionary.java
index 7d43c97..230c121 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/Dictionary.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/Dictionary.java
@@ -24,6 +24,8 @@
import java.io.IOException;
import java.util.Arrays;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.functionobjects.Builtin;
import org.apache.sysds.runtime.functionobjects.KahanFunction;
import org.apache.sysds.runtime.functionobjects.KahanPlus;
@@ -38,6 +40,7 @@
*/
public class Dictionary extends ADictionary {
+ protected static final Log LOG = LogFactory.getLog(Dictionary.class.getName());
private final double[] _values;
public Dictionary(double[] values) {
@@ -51,7 +54,7 @@
@Override
public double getValue(int i) {
- return _values[i];
+ return (i >= _values.length) ? 0.0 : _values[i];
}
@Override
@@ -171,14 +174,16 @@
@Override
protected void colSum(double[] c, int[] counts, int[] colIndexes, KahanFunction kplus) {
KahanObject kbuff = new KahanObject(0, 0);
- for(int k = 0, valOff = 0; k < _values.length; k++, valOff += colIndexes.length) {
+ int valOff = 0;
+ final int rows = c.length/2;
+ for(int k = 0; k < _values.length / colIndexes.length; k++) {
int cntk = counts[k];
for(int j = 0; j < colIndexes.length; j++) {
- kbuff.set(c[colIndexes[j]], c[colIndexes[j] + colIndexes.length]);
- // int index = getIndex();
- kplus.execute3(kbuff, getValue(valOff + j), cntk);
+ kbuff.set(c[colIndexes[j]], c[colIndexes[j] + rows]);
+ kbuff.set(c[colIndexes[j]], 0);
+ kplus.execute3(kbuff, getValue(valOff++), cntk);
c[colIndexes[j]] = kbuff._sum;
- c[colIndexes[j] + colIndexes.length] = kbuff._correction;
+ c[colIndexes[j] + rows] = kbuff._correction;
}
}
@@ -187,12 +192,22 @@
@Override
protected double sum(int[] counts, int ncol, KahanFunction kplus) {
KahanObject kbuff = new KahanObject(0, 0);
- for(int k = 0, valOff = 0; k < _values.length; k++, valOff += ncol) {
- int cntk = counts[k];
+ int valOff = 0;
+ for(int k = 0; k < _values.length / ncol; k++) {
+ int countK = counts[k];
for(int j = 0; j < ncol; j++) {
- kplus.execute3(kbuff, getValue(valOff + j), cntk);
+ kplus.execute3(kbuff, getValue(valOff++), countK);
}
}
return kbuff._sum;
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+
+ sb.append("Dictionary: " + hashCode());
+ sb.append("\n " + Arrays.toString(_values));
+ return sb.toString();
+ }
}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/colgroup/QDictionary.java b/src/main/java/org/apache/sysds/runtime/compress/colgroup/QDictionary.java
index 9cccf11..19c65fc 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/colgroup/QDictionary.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/colgroup/QDictionary.java
@@ -69,7 +69,7 @@
@Override
public double getValue(int i) {
- return (i == _values.length) ? 0.0 : _values[i] * _scale;
+ return (i >= _values.length) ? 0.0 : _values[i] * _scale;
}
public byte getValueByte(int i) {
@@ -250,12 +250,15 @@
@Override
protected void colSum(double[] c, int[] counts, int[] colIndexes, KahanFunction kplus) {
+
+ final int rows = c.length/2;
if(!(kplus instanceof KahanPlusSq)) {
int[] sum = new int[colIndexes.length];
- for(int k = 0, valOff = 0; k < _values.length; k++, valOff += colIndexes.length) {
+ int valOff = 0;
+ for(int k = 0; k < _values.length/ colIndexes.length; k++) {
int cntk = counts[k];
for(int j = 0; j < colIndexes.length; j++) {
- sum[j] += cntk * getValueByte(valOff + j);
+ sum[j] += cntk * getValueByte(valOff++);
}
}
for(int j = 0; j < colIndexes.length; j++) {
@@ -264,13 +267,14 @@
}
else {
KahanObject kbuff = new KahanObject(0, 0);
- for(int k = 0, valOff = 0; k < _values.length; k++, valOff += colIndexes.length) {
+ int valOff = 0;
+ for(int k = 0; k < _values.length/ colIndexes.length; k++) {
int cntk = counts[k];
for(int j = 0; j < colIndexes.length; j++) {
- kbuff.set(c[colIndexes[j]], c[colIndexes[j] + colIndexes.length]);
- kplus.execute3(kbuff, getValue(valOff + j), cntk);
+ kbuff.set(c[colIndexes[j]], c[colIndexes[j] + rows]);
+ kplus.execute3(kbuff, getValue(valOff++), cntk);
c[colIndexes[j]] = kbuff._sum;
- c[colIndexes[j] + colIndexes.length] = kbuff._correction;
+ c[colIndexes[j] + rows] = kbuff._correction;
}
}
}
@@ -280,20 +284,22 @@
protected double sum(int[] counts, int ncol, KahanFunction kplus) {
if(!(kplus instanceof KahanPlusSq)) {
int sum = 0;
- for(int k = 0, valOff = 0; k < _values.length; k++, valOff += ncol) {
- int cntk = counts[k];
+ int valOff = 0;
+ for(int k = 0; k < _values.length / ncol; k++) {
+ int countK = counts[k];
for(int j = 0; j < ncol; j++) {
- sum += cntk * getValueByte(valOff + j);
+ sum += countK * getValueByte(valOff++);
}
}
return sum * _scale;
}
else {
KahanObject kbuff = new KahanObject(0, 0);
- for(int k = 0, valOff = 0; k < _values.length; k++, valOff += ncol) {
- int cntk = counts[k];
+ int valOff = 0;
+ for(int k = 0; k < _values.length / ncol; k++) {
+ int countK = counts[k];
for(int j = 0; j < ncol; j++) {
- kplus.execute3(kbuff, getValue(valOff + j), cntk);
+ kplus.execute3(kbuff, getValue(valOff++), countK);
}
}
return kbuff._sum;
diff --git a/src/main/java/org/apache/sysds/runtime/compress/utils/DblArrayIntListHashMap.java b/src/main/java/org/apache/sysds/runtime/compress/utils/DblArrayIntListHashMap.java
index aced358..32d7ae7 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/utils/DblArrayIntListHashMap.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/utils/DblArrayIntListHashMap.java
@@ -174,5 +174,23 @@
public int compareTo(DArrayIListEntry o) {
return compare(this, o);
}
+
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder();
+ sb.append("[" + key + ", ");
+ sb.append( value + ", ");
+ sb.append( next + "]");
+ return sb.toString();
+ }
+ }
+
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder();
+ sb.append(this.getClass().getSimpleName() + this.hashCode());
+ for(DArrayIListEntry ent : _data)
+ sb.append("\n" + ent);
+ return sb.toString();
}
}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/utils/DoubleIntListHashMap.java b/src/main/java/org/apache/sysds/runtime/compress/utils/DoubleIntListHashMap.java
index 2a4d5f1..56f6169 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/utils/DoubleIntListHashMap.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/utils/DoubleIntListHashMap.java
@@ -20,6 +20,7 @@
package org.apache.sysds.runtime.compress.utils;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
@@ -157,5 +158,21 @@
return Double.compare(arg0.key, arg1.key);
}
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder();
+ sb.append("[" + key + ", ");
+ sb.append( value + ", ");
+ sb.append( next + "]");
+ return sb.toString();
+ }
+ }
+
+ @Override
+ public String toString(){
+ StringBuilder sb = new StringBuilder();
+ sb.append(this.getClass().getSimpleName() + this.hashCode());
+ sb.append("\n" + Arrays.toString(_data));
+ return sb.toString();
}
}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/utils/IntArrayList.java b/src/main/java/org/apache/sysds/runtime/compress/utils/IntArrayList.java
index 25ee75b..e99ca2f 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/utils/IntArrayList.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/utils/IntArrayList.java
@@ -106,9 +106,13 @@
sb.append("IntArrayList ");
sb.append("size: " + _size);
if(_size == 1){
- sb.append(" [" + _val0+ "]");
+ sb.append(" [" + _val0+ "] ");
} else{
- sb.append(" " + Arrays.toString(_data));
+ sb.append(" [");
+ for(int i = 0; i < _size-1; i++){
+ sb.append(_data[i] + ", ");
+ }
+ sb.append(_data[_data.length-1]+"] ");
}
return sb.toString();
}
diff --git a/src/main/java/org/apache/sysds/runtime/compress/utils/LinearAlgebraUtils.java b/src/main/java/org/apache/sysds/runtime/compress/utils/LinearAlgebraUtils.java
index 83389ac..23e534e 100644
--- a/src/main/java/org/apache/sysds/runtime/compress/utils/LinearAlgebraUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/compress/utils/LinearAlgebraUtils.java
@@ -19,6 +19,8 @@
package org.apache.sysds.runtime.compress.utils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.matrix.data.LibMatrixMult;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
@@ -28,6 +30,7 @@
* LibMatrixMult, these calls are simply forwarded to ensure consistency in performance and result correctness.
*/
public class LinearAlgebraUtils {
+ protected static final Log LOG = LogFactory.getLog(LinearAlgebraUtils.class.getName());
// forwarded calls to LibMatrixMult
public static double dotProduct(double[] a, double[] b, final int len) {
@@ -82,6 +85,27 @@
}
+ public static void vectListAdd(final double[] values, double[] c, char[] bix, final int rl, final int ru,
+ final int off) {
+ final int bn = (ru - rl) % 8;
+
+ // rest, not aligned to 8-blocks
+ for(int j = rl; j < rl + bn; j++)
+ c[j + off] += values[bix[j]];
+
+ // unrolled 8-block (for better instruction-level parallelism)
+ for(int j = rl + bn; j < ru; j += 8) {
+ c[j + 0 + off] += values[bix[j + 0]];
+ c[j + 1 + off] += values[bix[j + 1]];
+ c[j + 2 + off] += values[bix[j + 2]];
+ c[j + 3 + off] += values[bix[j + 3]];
+ c[j + 4 + off] += values[bix[j + 4]];
+ c[j + 5 + off] += values[bix[j + 5]];
+ c[j + 6 + off] += values[bix[j + 6]];
+ c[j + 7 + off] += values[bix[j + 7]];
+ }
+ }
+
public static void vectListAdd(final double[] values, double[] c, char[] bix, final int rl, final int ru) {
final int bn = (ru - rl) % 8;
@@ -102,6 +126,67 @@
}
}
+ public static void vectListAdd(final double[] values, double[] c, byte[] bix, final int rl, final int ru,
+ final int off) {
+ final int bn = (ru - rl) % 8;
+
+ // rest, not aligned to 8-blocks
+ for(int j = rl; j < rl + bn; j++)
+ c[j + off] += values[bix[j] & 0xFF];
+
+ // unrolled 8-block (for better instruction-level parallelism)
+ for(int j = rl + bn; j < ru; j += 8) {
+ c[j + 0 + off] += values[bix[j + 0] & 0xFF];
+ c[j + 1 + off] += values[bix[j + 1] & 0xFF];
+ c[j + 2 + off] += values[bix[j + 2] & 0xFF];
+ c[j + 3 + off] += values[bix[j + 3] & 0xFF];
+ c[j + 4 + off] += values[bix[j + 4] & 0xFF];
+ c[j + 5 + off] += values[bix[j + 5] & 0xFF];
+ c[j + 6 + off] += values[bix[j + 6] & 0xFF];
+ c[j + 7 + off] += values[bix[j + 7] & 0xFF];
+ }
+ }
+
+ public static void vectListAddDDC(final double[] values, double[] c, byte[] bix, final int rl, final int ru,
+ final int cl, final int cu, final int cut, final int numVals) {
+ for(int j = rl, off = rl * cut; j < ru; j++, off += cut) {
+ int rowIdx = (bix[j] & 0xFF);
+ if(rowIdx < numVals)
+ for(int k = cl, h = rowIdx * (cu - cl); k < cu; k++, h++)
+ c[off + k] += values[h];
+ }
+ }
+
+ public static void vectListAddDDC(final double[] values, double[] c, char[] bix, final int rl, final int ru,
+ final int cl, final int cu, final int cut, final int numVals) {
+ for(int j = rl, off = rl * cut; j < ru; j++, off += cut) {
+ int rowIdx = bix[j];
+ if(rowIdx < numVals)
+ for(int k = cl, h = rowIdx * (cu - cl); k < cu; k++, h++)
+ c[off + k] += values[h];
+ }
+ }
+
+ /**
+ * Adds the values list into all rows of c within row and col range.
+ *
+ * @param values The values to Add
+ * @param c The double array to add into
+ * @param rl The row lower index
+ * @param ru The row upper index
+ * @param cl The column lower index
+ * @param cu The column upper index
+ * @param cut The total number of columns in c.
+ * @param valOff The offset into the values list to start reading from.
+ */
+ public static void vectListAdd(final double[] values, double[] c, final int rl, final int ru, final int cl,
+ final int cu, final int cut, final int valOff) {
+ for(int j = rl, off = rl * cut; j < ru; j++, off += cut) {
+ for(int k = cl, h = valOff; k < cu; k++, h++)
+ c[off + k] += values[h];
+ }
+ }
+
public static void vectListAdd(final double[] values, double[] c, byte[] bix, final int rl, final int ru) {
final int bn = (ru - rl) % 8;
diff --git a/src/main/java/org/apache/sysds/runtime/util/DataConverter.java b/src/main/java/org/apache/sysds/runtime/util/DataConverter.java
index 9a6e761..25a09a0 100644
--- a/src/main/java/org/apache/sysds/runtime/util/DataConverter.java
+++ b/src/main/java/org/apache/sysds/runtime/util/DataConverter.java
@@ -77,8 +77,8 @@
* (before executing MR jobs).
*
*/
-public class DataConverter
-{
+public class DataConverter {
+ // private static final Log LOG = LogFactory.getLog(DataConverter.class.getName());
private static final String DELIM = " ";
//////////////
@@ -258,7 +258,7 @@
int rows = mb.getNumRows();
int cols = mb.getNumColumns();
double[][] ret = new double[rows][cols]; //0-initialized
-
+
if( mb.getNonZeros() > 0 ) {
if( mb.isInSparseFormat() ) {
Iterator<IJV> iter = mb.getSparseBlockIterator();
diff --git a/src/test/java/org/apache/sysds/test/TestUtils.java b/src/test/java/org/apache/sysds/test/TestUtils.java
index d491df3..f01cf61 100644
--- a/src/test/java/org/apache/sysds/test/TestUtils.java
+++ b/src/test/java/org/apache/sysds/test/TestUtils.java
@@ -737,7 +737,7 @@
for (int i = 0; i < rows && countErrors < 50; i++) {
for (int j = 0; j < cols && countErrors < 50; j++) {
if (!compareCellValue(expectedMatrix[i][j], actualMatrix[i][j], epsilon, false)) {
- message += ("\n " +expectedMatrix[i][j] +" vs actual: "+actualMatrix[i][j]+" at "+i+" "+j);
+ message += ("\n Expected: " +expectedMatrix[i][j] +" vs actual: "+actualMatrix[i][j]+" at "+i+" "+j);
countErrors++;
}
}
@@ -765,7 +765,7 @@
for (int j = 0; j < cols; j++) {
if( !( (expectedFrame[i][j]==null && actualFrame[i][j]==null) ||
expectedFrame[i][j].equals(actualFrame[i][j]) || (expectedFrame[i][j]+".0").equals(actualFrame[i][j])) ) {
- System.out.println(expectedFrame[i][j] +" vs actual: "+actualFrame[i][j]+" at "+i+" "+j);
+ System.out.println("Expected:" + expectedFrame[i][j] +" vs actual: "+actualFrame[i][j]+" at "+i+" "+j);
countErrors++;
}
}
@@ -783,7 +783,7 @@
for (int i = 0; i < rows; i++) {
for (int j = 0; j < cols; j++) {
if( !compareScalarBits(expectedMatrix[i][j], actualMatrix[i][j], maxUnitsOfLeastPrecision)){
- System.out.println(expectedMatrix[i][j] +" vs actual: "+actualMatrix[i][j]+" at "+i+" "+j);
+ System.out.println("Expected: " + expectedMatrix[i][j] +" vs actual: "+actualMatrix[i][j]+" at "+i+" "+j);
countErrors++;
}
}
@@ -804,19 +804,20 @@
int countErrors = 0;
long sumDistance = 0;
long distance;
- for (int i = 0; i < rows && countErrors < 50; i++) {
- for (int j = 0; j < cols && countErrors < 50; j++) {
+ for (int i = 0; i < rows && countErrors < 20; i++) {
+ for (int j = 0; j < cols && countErrors < 20; j++) {
distance = compareScalarBits(expectedMatrix[i][j], actualMatrix[i][j]);
sumDistance += distance;
if(distance > maxUnitsOfLeastPrecision){
- message += ("\n " + expectedMatrix[i][j] +" vs actual: "+actualMatrix[i][j]+" at "+i+" "+j + " Distance in bits: " + distance);
+ message += ("\n Expected:" + expectedMatrix[i][j] +" vs actual: "+actualMatrix[i][j]+" at "+i+" "+j + " Distance in bits: " + distance);
countErrors++;
}
}
}
- if(countErrors == 50){
- assertTrue(message + "\n At least 50 values are not in equal", countErrors == 0);
- }else{
+ if(countErrors == 20){
+ assertTrue(message + "\n At least 20 values are not in equal", countErrors == 0);
+ }
+ else{
long avgDistance = sumDistance / (rows * cols);
assertTrue(message + "\n" + countErrors + " values are not in equal", countErrors == 0);
assertTrue(message + "\nThe avg distance in bits: "+ avgDistance +" was higher than max: " + maxAvgDistance,
@@ -872,20 +873,25 @@
double sumPercentDistance = 0;
double distance;
- for (int i = 0; i < rows; i++) {
- for (int j = 0; j < cols; j++) {
+ for (int i = 0; i < rows && countErrors < 20; i++) {
+ for (int j = 0; j < cols && countErrors < 20; j++) {
distance = getPercentDistance(expectedMatrix[i][j], actualMatrix[i][j], ignoreZero);
sumPercentDistance += distance;
if(distance < percentDistanceAllowed){
- message += ("\n"+ expectedMatrix[i][j] +" vs actual: "+actualMatrix[i][j]+" at "+i+" "+j + " Distance in percent " + distance);
+ message += ("\nExpected: "+ expectedMatrix[i][j] +" vs actual: "+actualMatrix[i][j]+" at "+i+" "+j + " Distance in percent " + distance);
countErrors++;
}
}
}
- double avgDistance = sumPercentDistance / (rows * cols);
- assertTrue(message + "\n" + countErrors + " values are not in equal of total: " + (rows * cols), countErrors == 0);
- assertTrue(message + "\nThe avg distance: "+ avgDistance +" was lower than threshold " + maxAveragePercentDistance,
- avgDistance > maxAveragePercentDistance);
+ if(countErrors == 20){
+ assertTrue(message + "\n At least 20 values are not in equal", countErrors == 0);
+ }
+ else{
+ double avgDistance = sumPercentDistance / (rows * cols);
+ assertTrue(message + "\n" + countErrors + " values are not in equal of total: " + (rows * cols), countErrors == 0);
+ assertTrue(message + "\nThe avg distance: "+ avgDistance +" was lower than threshold " + maxAveragePercentDistance,
+ avgDistance > maxAveragePercentDistance);
+ }
}
public static void compareMatricesBitAvgDistance(double[][] expectedMatrix, double[][] actualMatrix, int rows,
diff --git a/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java b/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java
index dd94aa2..90add0e 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/AbstractCompressedUnaryTests.java
@@ -186,10 +186,13 @@
String css = compressionSettings.toString();
if(compressionSettings.lossy) {
if(aggType == AggType.COLSUMS) {
- TestUtils.compareMatrices(d1, d2, lossyTolerance * 150 * cols, css);
+ TestUtils.compareMatrices(d1, d2, lossyTolerance * 10 * rows, css);
}
else if(aggType == AggType.ROWSUMS) {
- TestUtils.compareMatrices(d1, d2, lossyTolerance * 16 * rows, css);
+ TestUtils.compareMatrices(d1, d2, lossyTolerance * 16 * cols, css);
+ }
+ else if(aggType == AggType.ROWSUMSSQ) {
+ TestUtils.compareMatricesPercentageDistance(d1, d2, 0.5, 0.9, css, true);
}
else if(aggType == AggType.SUM) {
TestUtils.compareMatrices(d1, d2, lossyTolerance * 10 * cols * rows, css);
@@ -201,8 +204,7 @@
TestUtils.compareMatrices(d1, d2, lossyTolerance, css);
}
else {
- boolean ignoreZero = true;
- TestUtils.compareMatricesPercentageDistance(d1, d2, 0.8, 0.9, css, ignoreZero);
+ TestUtils.compareMatricesPercentageDistance(d1, d2, 0.8, 0.9, css, true);
}
}
else {
diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java b/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java
index ba83baa..76e8fb9 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/CompressedMatrixTest.java
@@ -31,14 +31,11 @@
import org.apache.sysds.runtime.compress.CompressionSettings;
import org.apache.sysds.runtime.compress.CompressionStatistics;
import org.apache.sysds.runtime.compress.colgroup.ColGroup;
-import org.apache.sysds.runtime.functionobjects.Multiply;
import org.apache.sysds.runtime.matrix.data.LibMatrixCountDistinct;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
import org.apache.sysds.runtime.matrix.operators.CountDistinctOperator;
import org.apache.sysds.runtime.matrix.operators.CountDistinctOperator.CountDistinctTypes;
-import org.apache.sysds.runtime.matrix.operators.RightScalarOperator;
-import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.test.TestUtils;
import org.apache.sysds.test.component.compress.TestConstants.MatrixTypology;
@@ -120,39 +117,6 @@
}
@Test
- public void testScalarOperations() {
- try {
- if(!(cmb instanceof CompressedMatrixBlock))
- return; // Input was not compressed then just pass test
-
- double mult = 7;
- // matrix-scalar uncompressed
- ScalarOperator sop = new RightScalarOperator(Multiply.getMultiplyFnObject(), mult, _k);
- MatrixBlock ret1 = mb.scalarOperations(sop, new MatrixBlock());
-
- // matrix-scalar compressed
- MatrixBlock ret2 = cmb.scalarOperations(sop, new MatrixBlock());
- if(ret2 instanceof CompressedMatrixBlock)
- ret2 = ((CompressedMatrixBlock) ret2).decompress();
-
- // compare result with input
- double[][] d1 = DataConverter.convertToDoubleMatrix(ret1);
- double[][] d2 = DataConverter.convertToDoubleMatrix(ret2);
- if(compressionSettings.lossy) {
- double modifiedTolerance = lossyTolerance * mult + lossyTolerance * 0.00001;
- TestUtils.compareMatrices(d1, d2, modifiedTolerance, compressionSettings.toString());
- }
- else {
- TestUtils.compareMatricesBitAvgDistance(d1, d2, 150, 1, compressionSettings.toString());
- }
- }
- catch(Exception e) {
- e.printStackTrace();
- throw new RuntimeException(this.toString() + "\n" + e.getMessage(), e);
- }
- }
-
- @Test
public void testCountDistinct() {
try {
if(!(cmb instanceof CompressedMatrixBlock))
@@ -211,7 +175,7 @@
double[][] d1 = DataConverter.convertToDoubleMatrix(mb);
double[][] d2 = DataConverter.convertToDoubleMatrix(tmp);
if(compressionSettings.lossy) {
- TestUtils.compareMatrices(d1, d2, lossyTolerance);
+ TestUtils.compareMatrices(d1, d2, lossyTolerance, compressionSettings.toString());
}
else {
TestUtils.compareMatricesBitAvgDistance(d1, d2, 0, 0, compressionSettings.toString());
@@ -252,18 +216,21 @@
allowedTolerance = sampleTolerance;
}
- StringBuilder builder = new StringBuilder();
- builder.append("\n\t" + String.format("%-40s - %12d", "Actual compressed size: ", actualSize));
- builder.append("\n\t" + String.format("%-40s - %12d with tolerance: %5d",
- "<= estimated isolated ColGroups: ",
- colsEstimate,
- allowedTolerance));
- builder.append("\n\t" + String.format("%-40s - %12d", "<= Original size: ", originalSize));
- builder.append("\n\tcol groups types: " + cStat.getGroupsTypesString());
- builder.append("\n\tcol groups sizes: " + cStat.getGroupsSizesString());
- builder.append("\n\t" + this.toString());
- boolean res = Math.abs(actualSize - colsEstimate) <= allowedTolerance;
- assertTrue(builder.toString(), res);
+ boolean res = Math.abs(colsEstimate - actualSize) <= originalSize;
+ res = res && actualSize - allowedTolerance < colsEstimate;
+ if(!res) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("\n\t" + String.format("%-40s - %12d", "Actual compressed size: ", actualSize));
+ builder.append("\n\t" + String.format("%-40s - %12d with tolerance: %5d",
+ "<= estimated isolated ColGroups: ",
+ colsEstimate,
+ allowedTolerance));
+ builder.append("\n\t" + String.format("%-40s - %12d", "<= Original size: ", originalSize));
+ builder.append("\n\tcol groups types: " + cStat.getGroupsTypesString());
+ builder.append("\n\tcol groups sizes: " + cStat.getGroupsSizesString());
+ builder.append("\n\t" + this.toString());
+ assertTrue(builder.toString(), res);
+ }
}
catch(Exception e) {
e.printStackTrace();
diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
index d14ed7f..5d3c35f 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/CompressedTestBase.java
@@ -54,6 +54,7 @@
import org.junit.Test;
import org.junit.runners.Parameterized.Parameters;
+
public abstract class CompressedTestBase extends TestBase {
protected static final Log LOG = LogFactory.getLog(CompressedTestBase.class.getName());
@@ -68,13 +69,10 @@
protected static ValueType[] usedValueTypes = new ValueType[] {
// ValueType.RAND,
// ValueType.CONST,
- // ValueType.RAND_ROUND,
- ValueType.OLE_COMPRESSIBLE,
- // ValueType.RLE_COMPRESSIBLE,
- };
+ ValueType.RAND_ROUND, ValueType.OLE_COMPRESSIBLE, ValueType.RLE_COMPRESSIBLE,};
protected static ValueRange[] usedValueRanges = new ValueRange[] {ValueRange.SMALL,
- ValueRange.LARGE,
+ // ValueRange.LARGE,
// ValueRange.BYTE
};
@@ -84,30 +82,28 @@
// CLA TESTS!
new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed)
- .setValidCompressions(EnumSet.of(CompressionType.DDC)).setInvestigateEstimate(true).create(),
+ .setValidCompressions(EnumSet.of(CompressionType.DDC)).setInvestigateEstimate(true).create(),
new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed)
- .setValidCompressions(EnumSet.of(CompressionType.OLE)).setInvestigateEstimate(true).create(),
+ .setValidCompressions(EnumSet.of(CompressionType.OLE)).setInvestigateEstimate(true).create(),
new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed)
- .setValidCompressions(EnumSet.of(CompressionType.RLE)).setInvestigateEstimate(true).create(),
- new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed).setInvestigateEstimate(true)
- .create(),
- new CompressionSettingsBuilder().setSamplingRatio(1.0).setSeed(compressionSeed).setInvestigateEstimate(true)
- .setAllowSharedDictionary(false).setmaxStaticColGroupCoCode(1).create(),
-
- // // LOSSY TESTS!
-
- new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed)
- .setValidCompressions(EnumSet.of(CompressionType.DDC)).setInvestigateEstimate(true).setLossy(true).create(),
- new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed)
- .setValidCompressions(EnumSet.of(CompressionType.OLE)).setInvestigateEstimate(true).setLossy(true).create(),
- new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed)
- .setValidCompressions(EnumSet.of(CompressionType.RLE)).setInvestigateEstimate(true).setLossy(true).create(),
-
+ .setValidCompressions(EnumSet.of(CompressionType.RLE)).setInvestigateEstimate(true).create(),
new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed).setInvestigateEstimate(true)
.create(),
-
new CompressionSettingsBuilder().setSamplingRatio(1.0).setSeed(compressionSeed).setInvestigateEstimate(true)
- .setAllowSharedDictionary(false).setmaxStaticColGroupCoCode(1).setLossy(true).create(),
+ .setAllowSharedDictionary(false).setmaxStaticColGroupCoCode(1).create(),
+
+ // // // // LOSSY TESTS!
+
+ new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed)
+ .setValidCompressions(EnumSet.of(CompressionType.DDC)).setInvestigateEstimate(true).setLossy(true).create(),
+ new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed)
+ .setValidCompressions(EnumSet.of(CompressionType.OLE)).setInvestigateEstimate(true).setLossy(true).create(),
+ new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed)
+ .setValidCompressions(EnumSet.of(CompressionType.RLE)).setInvestigateEstimate(true).setLossy(true).create(),
+ new CompressionSettingsBuilder().setSamplingRatio(0.1).setSeed(compressionSeed).setInvestigateEstimate(true)
+ .create(),
+ new CompressionSettingsBuilder().setSamplingRatio(1.0).setSeed(compressionSeed).setInvestigateEstimate(true)
+ .setAllowSharedDictionary(false).setmaxStaticColGroupCoCode(1).setLossy(true).create(),
// COCODING TESTS!!
@@ -121,14 +117,14 @@
};
protected static MatrixTypology[] usedMatrixTypology = new MatrixTypology[] { // Selected Matrix Types
- MatrixTypology.SMALL,
- MatrixTypology.FEW_COL,
+ // MatrixTypology.SMALL, MatrixTypology.FEW_COL,
// MatrixTypology.FEW_ROW,
// MatrixTypology.LARGE,
- MatrixTypology.SINGLE_COL,
+ // MatrixTypology.SINGLE_COL,
// MatrixTypology.SINGLE_ROW,
- // MatrixTypology.L_ROWS,
+ MatrixTypology.L_ROWS,
// MatrixTypology.XL_ROWS,
+ // MatrixTypology.SINGLE_COL_L
};
// Compressed Block
@@ -161,7 +157,6 @@
if(cmb instanceof CompressedMatrixBlock) {
cmbDeCompressed = ((CompressedMatrixBlock) cmb).decompress(_k);
if(cmbDeCompressed != null) {
-
deCompressed = DataConverter.convertToDoubleMatrix(cmbDeCompressed);
}
}
@@ -169,7 +164,6 @@
cmbDeCompressed = null;
deCompressed = null;
}
-
}
catch(Exception e) {
e.printStackTrace();
@@ -177,17 +171,16 @@
}
}
+
/**
- * Tolerance for encoding values is the maximum value in dataset divided by number distinct values available in
- * a single Byte (since we encode our quntization in Byte)
+ * Tolerance for encoding values is the maximum value in dataset divided by number distinct values available in a
+ * single Byte (since we encode our quntization in Byte)
*
* @param valueRange The value range used as input
*/
private void setLossyTolerance(ValueRange valueRange) {
lossyTolerance = (double) (Math.max(TestConstants.getMaxRangeValue(valueRange),
Math.abs(TestConstants.getMinRangeValue(valueRange)))) * (1.0 / 127.0) / 2.0;
- // LOG.debug("TOLERANCE IN TEST:" + lossyTolerance);
-
}
@Parameters
@@ -292,7 +285,13 @@
TestUtils.compareMatricesPercentageDistance(d1, d2, 0.95, 0.95, compressionSettings.toString());
}
else {
- TestUtils.compareMatricesBitAvgDistance(d1, d2, 2048, 350, compressionSettings.toString());
+ if(rows > 50000) {
+ TestUtils
+ .compareMatricesPercentageDistance(d1, d2, 0.99, 0.999, compressionSettings.toString());
+ }
+ else {
+ TestUtils.compareMatricesBitAvgDistance(d1, d2, 2048, 350, compressionSettings.toString());
+ }
}
}
}
@@ -382,7 +381,7 @@
double[][] d1 = DataConverter.convertToDoubleMatrix(ret1);
double[][] d2 = DataConverter.convertToDoubleMatrix(ret2);
if(compressionSettings.lossy) {
- TestUtils.compareMatricesPercentageDistance(d1, d2, 0.35, 0.96, compressionSettings.toString());
+ TestUtils.compareMatricesPercentageDistance(d1, d2, 0.35, 0.92, compressionSettings.toString());
}
else {
TestUtils.compareMatricesBitAvgDistance(d1, d2, 10000, 500, compressionSettings.toString());
@@ -401,7 +400,7 @@
return; // Input was not compressed then just pass test
MatrixBlock matrix = DataConverter
- .convertToMatrixBlock(TestUtils.generateTestMatrix(2, rows, 0.9, 1.5, 1.0, 3));
+ .convertToMatrixBlock(TestUtils.generateTestMatrix(3, rows, 0.9, 1.5, 1.0, 3));
// Make Operator
AggregateBinaryOperator abop = InstructionUtils.getMatMultOperator(_k);
@@ -416,12 +415,12 @@
double[][] d1 = DataConverter.convertToDoubleMatrix(ret1);
double[][] d2 = DataConverter.convertToDoubleMatrix(ret2);
if(compressionSettings.lossy) {
- TestUtils.compareMatricesPercentageDistance(d1, d2, 0.25, 0.96, compressionSettings.toString());
+ TestUtils.compareMatricesPercentageDistance(d1, d2, 0.5, 0.92, compressionSettings.toString());
}
else {
// rows
if(rows > 65000) {
- TestUtils.compareMatricesPercentageDistance(d1, d2, 0.01, 0.99, compressionSettings.toString());
+ TestUtils.compareMatricesPercentageDistance(d1, d2, 0.5, 0.99, compressionSettings.toString());
}
else {
@@ -441,8 +440,9 @@
if(!(cmb instanceof CompressedMatrixBlock))
return; // Input was not compressed then just pass test
+ int cols = 50;
MatrixBlock matrix = DataConverter
- .convertToMatrixBlock(TestUtils.generateTestMatrix(402, rows, 0.9, 1.5, 1.0, 3));
+ .convertToMatrixBlock(TestUtils.generateTestMatrix(cols, rows, 0.9, 1.5, 1.0, 3));
// Make Operator
AggregateBinaryOperator abop = InstructionUtils.getMatMultOperator(_k);
@@ -457,14 +457,14 @@
double[][] d1 = DataConverter.convertToDoubleMatrix(ret1);
double[][] d2 = DataConverter.convertToDoubleMatrix(ret2);
if(compressionSettings.lossy) {
- TestUtils.compareMatricesPercentageDistance(d1, d2, 0.25, 0.96, compressionSettings.toString());
+ TestUtils.compareMatrices(d1, d2, lossyTolerance * cols * rows / 100, this.toString());
}
else {
if(rows > 65000) {
- TestUtils.compareMatricesPercentageDistance(d1, d2, 0.01, 0.99, compressionSettings.toString());
+ TestUtils.compareMatricesPercentageDistance(d1, d2, 0.90, 0.99, this.toString());
}
else {
- TestUtils.compareMatricesBitAvgDistance(d1, d2, 10000, 500, compressionSettings.toString());
+ TestUtils.compareMatricesBitAvgDistance(d1, d2, 1000 * 1000, 5096, this.toString());
}
}
}
@@ -475,7 +475,45 @@
}
@Test
- @Ignore
+ public void testLeftMatrixMatrixMultSparse() {
+ try {
+ if(!(cmb instanceof CompressedMatrixBlock))
+ return; // Input was not compressed then just pass test
+
+ MatrixBlock matrix = DataConverter
+ .convertToMatrixBlock(TestUtils.generateTestMatrix(2, rows, 0.9, 1.5, .1, 3));
+
+ // Make Operator
+ AggregateBinaryOperator abop = InstructionUtils.getMatMultOperator(_k);
+
+ // vector-matrix uncompressed
+ MatrixBlock ret1 = mb.aggregateBinaryOperations(matrix, mb, new MatrixBlock(), abop);
+
+ // vector-matrix compressed
+ MatrixBlock ret2 = cmb.aggregateBinaryOperations(matrix, cmb, new MatrixBlock(), abop);
+
+ // compare result with input
+ double[][] d1 = DataConverter.convertToDoubleMatrix(ret1);
+ double[][] d2 = DataConverter.convertToDoubleMatrix(ret2);
+ if(compressionSettings.lossy) {
+ TestUtils.compareMatricesPercentageDistance(d1, d2, 0.25, 0.83, compressionSettings.toString());
+ }
+ else {
+ if(rows > 65000) {
+ TestUtils.compareMatricesPercentageDistance(d1, d2, 0.99, 0.99, compressionSettings.toString());
+ }
+ else {
+ TestUtils.compareMatricesBitAvgDistance(d1, d2, 1000, 500, compressionSettings.toString());
+ }
+ }
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(this.toString() + "\n" + e.getMessage(), e);
+ }
+ }
+
+ @Test
public void testRightMatrixMatrixMultSmall() {
try {
if(!(cmb instanceof CompressedMatrixBlock))
@@ -497,11 +535,107 @@
double[][] d1 = DataConverter.convertToDoubleMatrix(ret1);
double[][] d2 = DataConverter.convertToDoubleMatrix(ret2);
if(compressionSettings.lossy) {
- TestUtils.compareMatricesPercentageDistance(d1, d2, 0.25, 0.96, compressionSettings.toString());
+ if(rows > 65000) {
+ TestUtils.compareMatrices(d1, d2, lossyTolerance * cols * rows / 50, this.toString());
+ }
+ else {
+ TestUtils.compareMatrices(d1, d2, lossyTolerance * cols * rows / 100, this.toString());
+ }
}
else {
if(rows > 65000) {
- TestUtils.compareMatricesPercentageDistance(d1, d2, 0.01, 0.99, compressionSettings.toString());
+ TestUtils.compareMatricesPercentageDistance(d1, d2, 0.5, 0.99, this.toString());
+ }
+ else {
+ TestUtils.compareMatricesBitAvgDistance(d1, d2, 10000, 500, this.toString());
+ }
+ }
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(this.toString() + "\n" + e.getMessage(), e);
+ }
+ }
+
+ @Test
+ public void testRightMatrixMatrixMultMedium() {
+ try {
+ if(!(cmb instanceof CompressedMatrixBlock))
+ return; // Input was not compressed then just pass test
+
+ MatrixBlock matrix = DataConverter
+ .convertToMatrixBlock(TestUtils.generateTestMatrix(cols, 16, 0.9, 1.5, 1.0, 3));
+
+ // Make Operator
+ AggregateBinaryOperator abop = InstructionUtils.getMatMultOperator(_k);
+
+ // vector-matrix uncompressed
+ MatrixBlock ret1 = mb.aggregateBinaryOperations(mb, matrix, new MatrixBlock(), abop);
+
+ // vector-matrix compressed
+ MatrixBlock ret2 = cmb.aggregateBinaryOperations(cmb, matrix, new MatrixBlock(), abop);
+
+ // compare result with input
+ double[][] d1 = DataConverter.convertToDoubleMatrix(ret1);
+ double[][] d2 = DataConverter.convertToDoubleMatrix(ret2);
+ if(compressionSettings.lossy) {
+ if(rows > 65000) {
+ TestUtils.compareMatrices(d1, d2, lossyTolerance * cols * rows / 50, this.toString());
+ }
+ else {
+ TestUtils.compareMatrices(d1, d2, lossyTolerance * cols * rows / 100, this.toString());
+
+ }
+ }
+ else {
+ if(rows > 65000) {
+ TestUtils.compareMatricesPercentageDistance(d1, d2, 0.5, 0.99, this.toString());
+ }
+ else {
+ TestUtils.compareMatricesBitAvgDistance(d1, d2, 10000, 500, this.toString());
+ }
+ }
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(this.toString() + "\n" + e.getMessage(), e);
+ }
+ }
+
+ @Test
+ public void testRightMatrixMatrixMultSparse() {
+ try {
+ if(!(cmb instanceof CompressedMatrixBlock))
+ return; // Input was not compressed then just pass test
+
+ MatrixBlock matrix = DataConverter
+ .convertToMatrixBlock(TestUtils.generateTestMatrix(cols, 25, 0.9, 1.5, 0.2, 3));
+
+ matrix.quickSetValue(0, 0, 10);
+ // Make Operator
+ AggregateBinaryOperator abop = InstructionUtils.getMatMultOperator(_k);
+
+ // vector-matrix uncompressed
+ MatrixBlock ret1 = mb.aggregateBinaryOperations(mb, matrix, new MatrixBlock(), abop);
+
+ // vector-matrix compressed
+ MatrixBlock ret2 = cmb.aggregateBinaryOperations(cmb, matrix, new MatrixBlock(), abop);
+
+ // compare result with input
+ double[][] d1 = DataConverter.convertToDoubleMatrix(ret1);
+ double[][] d2 = DataConverter.convertToDoubleMatrix(ret2);
+ if(compressionSettings.lossy) {
+ if(rows > 65000) {
+ TestUtils.compareMatrices(d1, d2, lossyTolerance * cols * rows / 10, this.toString());
+ }
+ else {
+ TestUtils.compareMatrices(d1, d2, lossyTolerance * cols * rows / 15, this.toString());
+
+ }
+ }
+ else {
+ if(rows > 65000) {
+ TestUtils.compareMatricesPercentageDistance(d1, d2, 0.5, 0.99, compressionSettings.toString());
}
else {
TestUtils.compareMatricesBitAvgDistance(d1, d2, 10000, 500, compressionSettings.toString());
@@ -534,11 +668,17 @@
double[][] d2 = DataConverter.convertToDoubleMatrix(ret2);
// High probability that The value is off by some amount
if(compressionSettings.lossy) {
- //Probably the worst thing you can do to increase the amount the values are estimated wrong
- TestUtils.compareMatricesPercentageDistance(d1, d2, 0.0, 0.8, compressionSettings.toString());
+ // Probably the worst thing you can do to increase the amount the values are estimated wrong
+ TestUtils.compareMatricesPercentageDistance(d1, d2, 0.5, 0.8, compressionSettings.toString());
}
else {
- TestUtils.compareMatricesBitAvgDistance(d1, d2, 2048, 64, compressionSettings.toString());
+ if(rows > 50000) {
+ TestUtils
+ .compareMatricesPercentageDistance(d1, d2, 0.99, 0.999, compressionSettings.toString());
+ }
+ else {
+ TestUtils.compareMatricesBitAvgDistance(d1, d2, 2048, 512, compressionSettings.toString());
+ }
}
}
}
@@ -549,6 +689,39 @@
}
@Test
+ public void testScalarOperations() {
+ try {
+ if(!(cmb instanceof CompressedMatrixBlock))
+ return; // Input was not compressed then just pass test
+
+ double mult = 7;
+ // matrix-scalar uncompressed
+ ScalarOperator sop = new RightScalarOperator(Multiply.getMultiplyFnObject(), mult, _k);
+ MatrixBlock ret1 = mb.scalarOperations(sop, new MatrixBlock());
+
+ // matrix-scalar compressed
+ MatrixBlock ret2 = cmb.scalarOperations(sop, new MatrixBlock());
+ if(ret2 instanceof CompressedMatrixBlock)
+ ret2 = ((CompressedMatrixBlock) ret2).decompress();
+
+ // compare result with input
+ double[][] d1 = DataConverter.convertToDoubleMatrix(ret1);
+ double[][] d2 = DataConverter.convertToDoubleMatrix(ret2);
+ if(compressionSettings.lossy) {
+ double modifiedTolerance = lossyTolerance * mult + lossyTolerance * 0.00001;
+ TestUtils.compareMatrices(d1, d2, modifiedTolerance, compressionSettings.toString());
+ }
+ else {
+ TestUtils.compareMatricesBitAvgDistance(d1, d2, 150, 1, compressionSettings.toString());
+ }
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(this.toString() + "\n" + e.getMessage(), e);
+ }
+ }
+
+ @Test
public void testScalarOperationsSparseUnsafe() {
try {
if(!(cmb instanceof CompressedMatrixBlock))
@@ -571,7 +744,7 @@
if(compressionSettings.lossy) {
double modifiedTolerance = Math.max(TestConstants.getMaxRangeValue(valRange) + addValue,
Math.abs(TestConstants.getMinRangeValue(valRange) + addValue)) * 2 / 127.0;
- TestUtils.compareMatrices(d1, d2, modifiedTolerance);
+ TestUtils.compareMatrices(d1, d2, modifiedTolerance, compressionSettings.toString());
}
else {
TestUtils.compareMatricesBitAvgDistance(d1, d2, 150, 1, compressionSettings.toString());
diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressedVectorTest.java b/src/test/java/org/apache/sysds/test/component/compress/CompressedVectorTest.java
index d407602..a2090c0 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/CompressedVectorTest.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/CompressedVectorTest.java
@@ -26,6 +26,7 @@
import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
import org.apache.sysds.runtime.compress.CompressionSettings;
+import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysds.runtime.functionobjects.CM;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.operators.CMOperator;
@@ -43,17 +44,11 @@
@RunWith(value = Parameterized.class)
public class CompressedVectorTest extends CompressedTestBase {
- private final int _k = 1;
-
protected static MatrixTypology[] usedMatrixTypologyLocal = new MatrixTypology[] {// types
- MatrixTypology.SINGLE_COL,
+ MatrixTypology.SINGLE_COL,
// MatrixTypology.SINGLE_COL_L
};
- protected int getK() {
- return _k;
- }
-
@Parameters
public static Collection<Object[]> data() {
ArrayList<Object[]> tests = new ArrayList<>();
@@ -73,7 +68,7 @@
public CompressedVectorTest(SparsityType sparType, ValueType valType, ValueRange valRange,
CompressionSettings compSettings, MatrixTypology matrixTypology) {
- super(sparType, valType, valRange, compSettings, matrixTypology, 1);
+ super(sparType, valType, valRange, compSettings, matrixTypology, InfrastructureAnalyzer.getLocalParallelism());
}
@Test
diff --git a/src/test/java/org/apache/sysds/test/component/compress/CompressibleInputGenerator.java b/src/test/java/org/apache/sysds/test/component/compress/CompressibleInputGenerator.java
index 811a85e..023d201 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/CompressibleInputGenerator.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/CompressibleInputGenerator.java
@@ -76,6 +76,7 @@
// LOG.debug("MAX: " + maxV + " - MIN:" + minV);
assertTrue(maxV <= max);
assertTrue(minV >= min);
+
}
return output;
}
diff --git a/src/test/java/org/apache/sysds/test/component/compress/ParCompressedMatrixTest.java b/src/test/java/org/apache/sysds/test/component/compress/ParCompressedMatrixTest.java
index 91cc710..57bf132 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/ParCompressedMatrixTest.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/ParCompressedMatrixTest.java
@@ -19,13 +19,20 @@
package org.apache.sysds.test.component.compress;
+import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
import org.apache.sysds.runtime.compress.CompressionSettings;
import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
+import org.apache.sysds.runtime.instructions.InstructionUtils;
+import org.apache.sysds.runtime.matrix.data.MatrixBlock;
+import org.apache.sysds.runtime.matrix.operators.AggregateBinaryOperator;
import org.apache.sysds.runtime.matrix.operators.AggregateUnaryOperator;
+import org.apache.sysds.runtime.util.DataConverter;
+import org.apache.sysds.test.TestUtils;
import org.apache.sysds.test.component.compress.TestConstants.MatrixTypology;
import org.apache.sysds.test.component.compress.TestConstants.SparsityType;
import org.apache.sysds.test.component.compress.TestConstants.ValueRange;
import org.apache.sysds.test.component.compress.TestConstants.ValueType;
+import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -44,4 +51,43 @@
testUnaryOperators(aggType, auop);
}
+ @Test
+ public void testLeftMatrixMatrixMultMediumSparse2() {
+ try {
+ if(!(cmb instanceof CompressedMatrixBlock))
+ return; // Input was not compressed then just pass test
+
+ MatrixBlock matrix = DataConverter
+ .convertToMatrixBlock(TestUtils.generateTestMatrix(132, rows, 0.9, 1.5, .1, 3));
+
+ // Make Operator
+ AggregateBinaryOperator abop = InstructionUtils.getMatMultOperator(_k);
+
+ // vector-matrix uncompressed
+ MatrixBlock ret1 = mb.aggregateBinaryOperations(matrix, mb, new MatrixBlock(), abop);
+
+ // vector-matrix compressed
+ MatrixBlock ret2 = cmb.aggregateBinaryOperations(matrix, cmb, new MatrixBlock(), abop);
+
+ // compare result with input
+ double[][] d1 = DataConverter.convertToDoubleMatrix(ret1);
+ double[][] d2 = DataConverter.convertToDoubleMatrix(ret2);
+ if(compressionSettings.lossy) {
+ TestUtils.compareMatricesPercentageDistance(d1, d2, 0.25, 0.83, compressionSettings.toString());
+ }
+ else {
+ if(rows > 65000) {
+ TestUtils.compareMatricesPercentageDistance(d1, d2, 0.50, 0.99, compressionSettings.toString());
+ }
+ else {
+ TestUtils.compareMatricesBitAvgDistance(d1, d2, 10000, 500, compressionSettings.toString());
+ }
+ }
+ }
+ catch(Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(this.toString() + "\n" + e.getMessage(), e);
+ }
+ }
+
}
diff --git a/src/test/java/org/apache/sysds/test/component/compress/TestConstants.java b/src/test/java/org/apache/sysds/test/component/compress/TestConstants.java
index 47895e8..5846686 100644
--- a/src/test/java/org/apache/sysds/test/component/compress/TestConstants.java
+++ b/src/test/java/org/apache/sysds/test/component/compress/TestConstants.java
@@ -24,8 +24,8 @@
*/
public class TestConstants {
- private static final int rows[] = {4, 2008, 1283, 5, 1, 251, 5000, 70000, 3123};
- private static final int cols[] = {20, 20, 13, 998, 321, 1, 8, 10, 1};
+ private static final int rows[] = {4, 2008, 1283, 5, 1, 100, 5000, 100000, 64000*2};
+ private static final int cols[] = {20, 20, 13, 998, 321, 1, 5, 1, 1};
private static final double[] sparsityValues = {0.9, 0.1, 0.01, 0.0, 1.0};
private static final int[] mins = {-10, -127 * 2};
diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties
index 9195367..d1f4e58 100644
--- a/src/test/resources/log4j.properties
+++ b/src/test/resources/log4j.properties
@@ -26,6 +26,8 @@
log4j.logger.org.apache.sysds.test.AutomatedTestBase=ERROR
log4j.logger.org.apache.sysds=WARN
log4j.logger.org.apache.sysds.runtime.compress.AbstractCompressedMatrixBlock=ERROR
+# log4j.logger.org.apache.sysds.runtime.compress.CompressedMatrixBlockFactory=DEBUG
+# log4j.logger.org.apache.sysds.runtime.compress.cocode=DEBUG
log4j.logger.org.apache.sysds.parser.DataExpression=ERROR
log4j.logger.org.apache.spark=OFF
log4j.logger.org.apache.hadoop=OFF