blob: 78a0bdba512c6909ec32f8c7a6328a8e87090b17 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.compress.colgroup;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import org.apache.commons.lang.NotImplementedException;
import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary;
import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary;
import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory;
import org.apache.sysds.runtime.compress.colgroup.dictionary.MatrixBlockDictionary;
import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory;
import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex;
import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData;
import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory;
import org.apache.sysds.runtime.compress.colgroup.offset.AOffsetIterator;
import org.apache.sysds.runtime.compress.colgroup.scheme.DDCScheme;
import org.apache.sysds.runtime.compress.colgroup.scheme.ICLAScheme;
import org.apache.sysds.runtime.compress.cost.ComputationCostEstimator;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
import org.apache.sysds.runtime.compress.estim.EstimationFactors;
import org.apache.sysds.runtime.compress.estim.encoding.EncodingFactory;
import org.apache.sysds.runtime.compress.estim.encoding.IEncode;
import org.apache.sysds.runtime.data.DenseBlock;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.functionobjects.Builtin;
import org.apache.sysds.runtime.functionobjects.Minus;
import org.apache.sysds.runtime.functionobjects.Plus;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.operators.BinaryOperator;
import org.apache.sysds.runtime.matrix.operators.ScalarOperator;
import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
/**
* Class to encapsulate information about a column group that is encoded with dense dictionary encoding (DDC).
*/
public class ColGroupDDC extends APreAgg implements IMapToDataGroup {
private static final long serialVersionUID = -5769772089913918987L;
protected final AMapToData _data;
private ColGroupDDC(IColIndex colIndexes, ADictionary dict, AMapToData data, int[] cachedCounts) {
super(colIndexes, dict, cachedCounts);
_data = data;
}
public static AColGroup create(IColIndex colIndexes, ADictionary dict, AMapToData data, int[] cachedCounts) {
if(data.getUnique() == 1)
return ColGroupConst.create(colIndexes, dict);
else if(dict == null)
return new ColGroupEmpty(colIndexes);
else
return new ColGroupDDC(colIndexes, dict, data, cachedCounts);
}
public AColGroup sparsifyFOR() {
return ColGroupDDCFOR.sparsifyFOR(this);
}
public CompressionType getCompType() {
return CompressionType.DDC;
}
@Override
protected void decompressToDenseBlockSparseDictionary(DenseBlock db, int rl, int ru, int offR, int offC,
SparseBlock sb) {
for(int r = rl, offT = rl + offR; r < ru; r++, offT++) {
final int vr = _data.getIndex(r);
if(sb.isEmpty(vr))
continue;
final double[] c = db.values(offT);
final int off = db.pos(offT) + offC;
final int apos = sb.pos(vr);
final int alen = sb.size(vr) + apos;
final int[] aix = sb.indexes(vr);
final double[] aval = sb.values(vr);
for(int j = apos; j < alen; j++)
c[off + _colIndexes.get(aix[j])] += aval[j];
}
}
@Override
protected void decompressToDenseBlockDenseDictionary(DenseBlock db, int rl, int ru, int offR, int offC,
double[] values) {
if(db.isContiguous()) {
final int nCol = db.getDim(1);
if(_colIndexes.size() == 1 && nCol == 1)
decompressToDenseBlockDenseDictSingleColOutContiguous(db, rl, ru, offR, offC, values);
else if(_colIndexes.size() == 1)
decompressToDenseBlockDenseDictSingleColContiguous(db, rl, ru, offR, offC, values);
else if(_colIndexes.size() == nCol) // offC == 0 implied
decompressToDenseBlockDenseDictAllColumnsContiguous(db, rl, ru, offR, values);
else if(offC == 0 && offR == 0)
decompressToDenseBlockDenseDictNoOff(db, rl, ru, values);
else if(offC == 0)
decompressToDenseBlockDenseDictNoColOffset(db, rl, ru, offR, values);
else
decompressToDenseBlockDenseDictGeneric(db, rl, ru, offR, offC, values);
}
else
decompressToDenseBlockDenseDictGeneric(db, rl, ru, offR, offC, values);
}
private final void decompressToDenseBlockDenseDictSingleColContiguous(DenseBlock db, int rl, int ru, int offR,
int offC, double[] values) {
final double[] c = db.values(0);
final int nCols = db.getDim(1);
final int colOff = _colIndexes.get(0) + offC;
for(int i = rl, offT = (rl + offR) * nCols + colOff; i < ru; i++, offT += nCols)
c[offT] += values[_data.getIndex(i)];
}
@Override
public AMapToData getMapToData() {
return _data;
}
private final void decompressToDenseBlockDenseDictSingleColOutContiguous(DenseBlock db, int rl, int ru, int offR,
int offC, double[] values) {
final double[] c = db.values(0);
for(int i = rl, offT = rl + offR + _colIndexes.get(0) + offC; i < ru; i++, offT++)
c[offT] += values[_data.getIndex(i)];
}
private final void decompressToDenseBlockDenseDictAllColumnsContiguous(DenseBlock db, int rl, int ru, int offR,
double[] values) {
final double[] c = db.values(0);
final int nCol = _colIndexes.size();
for(int r = rl; r < ru; r++) {
final int start = _data.getIndex(r) * nCol;
final int end = start + nCol;
final int offStart = (offR + r) * nCol;
for(int vOff = start, off = offStart; vOff < end; vOff++, off++)
c[off] += values[vOff];
}
}
private final void decompressToDenseBlockDenseDictNoColOffset(DenseBlock db, int rl, int ru, int offR,
double[] values) {
final int nCol = _colIndexes.size();
final int colOut = db.getDim(1);
int off = (rl + offR) * colOut;
for(int i = rl, offT = rl + offR; i < ru; i++, off += colOut) {
final double[] c = db.values(offT);
final int rowIndex = _data.getIndex(i) * nCol;
for(int j = 0; j < nCol; j++)
c[off + _colIndexes.get(j)] += values[rowIndex + j];
}
}
private final void decompressToDenseBlockDenseDictNoOff(DenseBlock db, int rl, int ru, double[] values) {
final int nCol = _colIndexes.size();
final int nColU = db.getDim(1);
final double[] c = db.values(0);
for(int i = rl; i < ru; i++) {
final int off = i * nColU;
final int rowIndex = _data.getIndex(i) * nCol;
for(int j = 0; j < nCol; j++)
c[off + _colIndexes.get(j)] += values[rowIndex + j];
}
}
private final void decompressToDenseBlockDenseDictGeneric(DenseBlock db, int rl, int ru, int offR, int offC,
double[] values) {
final int nCol = _colIndexes.size();
for(int i = rl, offT = rl + offR; i < ru; i++, offT++) {
final double[] c = db.values(offT);
final int off = db.pos(offT) + offC;
final int rowIndex = _data.getIndex(i) * nCol;
for(int j = 0; j < nCol; j++)
c[off + _colIndexes.get(j)] += values[rowIndex + j];
}
}
@Override
protected void decompressToSparseBlockSparseDictionary(SparseBlock ret, int rl, int ru, int offR, int offC,
SparseBlock sb) {
for(int r = rl, offT = rl + offR; r < ru; r++, offT++) {
final int vr = _data.getIndex(r);
if(sb.isEmpty(vr))
continue;
final int apos = sb.pos(vr);
final int alen = sb.size(vr) + apos;
final int[] aix = sb.indexes(vr);
final double[] aval = sb.values(vr);
for(int j = apos; j < alen; j++)
ret.append(offT, offC + _colIndexes.get(aix[j]), aval[j]);
}
}
@Override
protected void decompressToSparseBlockDenseDictionary(SparseBlock ret, int rl, int ru, int offR, int offC,
double[] values) {
final int nCol = _colIndexes.size();
for(int i = rl, offT = rl + offR; i < ru; i++, offT++) {
final int rowIndex = _data.getIndex(i) * nCol;
for(int j = 0; j < nCol; j++)
ret.append(offT, _colIndexes.get(j) + offC, values[rowIndex + j]);
}
}
@Override
public double getIdx(int r, int colIdx) {
return _dict.getValue(_data.getIndex(r), colIdx, _colIndexes.size());
}
@Override
protected void computeRowSums(double[] c, int rl, int ru, double[] preAgg) {
for(int rix = rl; rix < ru; rix++)
c[rix] += preAgg[_data.getIndex(rix)];
}
@Override
protected void computeRowMxx(double[] c, Builtin builtin, int rl, int ru, double[] preAgg) {
for(int i = rl; i < ru; i++)
c[i] = builtin.execute(c[i], preAgg[_data.getIndex(i)]);
}
@Override
protected void computeRowProduct(double[] c, int rl, int ru, double[] preAgg) {
for(int rix = rl; rix < ru; rix++)
c[rix] *= preAgg[_data.getIndex(rix)];
}
@Override
public int[] getCounts(int[] counts) {
return _data.getCounts(counts);
}
@Override
public void leftMultByMatrixNoPreAgg(MatrixBlock matrix, MatrixBlock result, int rl, int ru, int cl, int cu) {
if(_colIndexes.size() == 1)
leftMultByMatrixNoPreAggSingleCol(matrix, result, rl, ru, cl, cu);
else
lmMatrixNoPreAggMultiCol(matrix, result, rl, ru, cl, cu);
}
private void leftMultByMatrixNoPreAggSingleCol(MatrixBlock matrix, MatrixBlock result, int rl, int ru, int cl,
int cu) {
final double[] retV = result.getDenseBlockValues();
final int nColM = matrix.getNumColumns();
final int nColRet = result.getNumColumns();
final double[] dictVals = _dict.getValues(); // guaranteed dense double since we only have one column.
if(matrix.isInSparseFormat()) {
if(cl != 0 || cu != _data.size())
throw new NotImplementedException();
lmSparseMatrixNoPreAggSingleCol(matrix.getSparseBlock(), nColM, retV, nColRet, dictVals, rl, ru);
}
else
lmDenseMatrixNoPreAggSingleCol(matrix.getDenseBlockValues(), nColM, retV, nColRet, dictVals, rl, ru, cl, cu);
}
private void lmSparseMatrixNoPreAggSingleCol(SparseBlock sb, int nColM, double[] retV, int nColRet, double[] vals,
int rl, int ru) {
final int colOut = _colIndexes.get(0);
for(int r = rl; r < ru; r++) {
if(sb.isEmpty(r))
continue;
final int apos = sb.pos(r);
final int alen = sb.size(r) + apos;
final int[] aix = sb.indexes(r);
final double[] aval = sb.values(r);
final int offR = r * nColRet;
for(int i = apos; i < alen; i++)
retV[offR + colOut] += aval[i] * vals[_data.getIndex(aix[i])];
}
}
private void lmDenseMatrixNoPreAggSingleCol(double[] mV, int nColM, double[] retV, int nColRet, double[] vals,
int rl, int ru, int cl, int cu) {
final int colOut = _colIndexes.get(0);
for(int r = rl; r < ru; r++) {
final int offL = r * nColM;
final int offR = r * nColRet;
for(int c = cl; c < cu; c++)
retV[offR + colOut] += mV[offL + c] * vals[_data.getIndex(c)];
}
}
private void lmMatrixNoPreAggMultiCol(MatrixBlock matrix, MatrixBlock result, int rl, int ru, int cl, int cu) {
if(matrix.isInSparseFormat()) {
if(cl != 0 || cu != _data.size())
throw new NotImplementedException(
"Not implemented left multiplication on sparse without it being entire input");
lmSparseMatrixNoPreAggMultiCol(matrix, result, rl, ru);
}
else
lmDenseMatrixNoPreAggMultiCol(matrix, result, rl, ru, cl, cu);
}
private void lmSparseMatrixNoPreAggMultiCol(MatrixBlock matrix, MatrixBlock result, int rl, int ru) {
final double[] retV = result.getDenseBlockValues();
final int nColRet = result.getNumColumns();
final SparseBlock sb = matrix.getSparseBlock();
for(int r = rl; r < ru; r++) {
if(sb.isEmpty(r))
continue;
final int apos = sb.pos(r);
final int alen = sb.size(r) + apos;
final int[] aix = sb.indexes(r);
final double[] aval = sb.values(r);
final int offR = r * nColRet;
for(int i = apos; i < alen; i++)
_dict.multiplyScalar(aval[i], retV, offR, _data.getIndex(aix[i]), _colIndexes);
}
}
private void lmDenseMatrixNoPreAggMultiCol(MatrixBlock matrix, MatrixBlock result, int rl, int ru, int cl, int cu) {
final double[] retV = result.getDenseBlockValues();
final int nColM = matrix.getNumColumns();
final int nColRet = result.getNumColumns();
final double[] mV = matrix.getDenseBlockValues();
for(int r = rl; r < ru; r++) {
final int offL = r * nColM;
final int offR = r * nColRet;
for(int c = cl; c < cu; c++)
_dict.multiplyScalar(mV[offL + c], retV, offR, _data.getIndex(c), _colIndexes);
}
}
@Override
public void preAggregateDense(MatrixBlock m, double[] preAgg, int rl, int ru, int cl, int cu) {
_data.preAggregateDense(m, preAgg, rl, ru, cl, cu);
}
@Override
public void preAggregateSparse(SparseBlock sb, double[] preAgg, int rl, int ru) {
_data.preAggregateSparse(sb, preAgg, rl, ru);
}
@Override
public void preAggregateThatDDCStructure(ColGroupDDC that, Dictionary ret) {
_data.preAggregateDDC_DDC(that._data, that._dict, ret, that._colIndexes.size());
}
@Override
public void preAggregateThatSDCZerosStructure(ColGroupSDCZeros that, Dictionary ret) {
_data.preAggregateDDC_SDCZ(that._data, that._dict, that._indexes, ret, that._colIndexes.size());
}
@Override
public void preAggregateThatSDCSingleZerosStructure(ColGroupSDCSingleZeros that, Dictionary ret) {
final AOffsetIterator itThat = that._indexes.getOffsetIterator();
final int nCol = that._colIndexes.size();
final int finalOff = that._indexes.getOffsetToLast();
final double[] v = ret.getValues();
while(true) {
final int to = _data.getIndex(itThat.value());
that._dict.addToEntry(v, 0, to, nCol);
if(itThat.value() == finalOff)
break;
itThat.next();
}
}
@Override
protected void preAggregateThatRLEStructure(ColGroupRLE that, Dictionary ret) {
_data.preAggregateDDC_RLE(that._ptr, that._data, that._dict, ret, that._colIndexes.size());
}
@Override
public boolean sameIndexStructure(AColGroupCompressed that) {
return that instanceof ColGroupDDC && ((ColGroupDDC) that)._data == _data;
}
@Override
public ColGroupType getColGroupType() {
return ColGroupType.DDC;
}
@Override
public long estimateInMemorySize() {
long size = super.estimateInMemorySize();
size += _data.getInMemorySize();
return size;
}
@Override
public AColGroup scalarOperation(ScalarOperator op) {
if((op.fn instanceof Plus || op.fn instanceof Minus)) {
final double v0 = op.executeScalar(0);
if(v0 == 0)
return this;
final double[] reference = ColGroupUtils.createReference(_colIndexes.size(), v0);
return ColGroupDDCFOR.create(_colIndexes, _dict, _data, getCachedCounts(), reference);
}
return create(_colIndexes, _dict.applyScalarOp(op), _data, getCachedCounts());
}
@Override
public AColGroup unaryOperation(UnaryOperator op) {
return create(_colIndexes, _dict.applyUnaryOp(op), _data, getCachedCounts());
}
@Override
public AColGroup binaryRowOpLeft(BinaryOperator op, double[] v, boolean isRowSafe) {
ADictionary ret = _dict.binOpLeft(op, v, _colIndexes);
return create(_colIndexes, ret, _data, getCachedCounts());
}
@Override
public AColGroup binaryRowOpRight(BinaryOperator op, double[] v, boolean isRowSafe) {
if((op.fn instanceof Plus || op.fn instanceof Minus) && _dict instanceof MatrixBlockDictionary &&
((MatrixBlockDictionary) _dict).getMatrixBlock().isInSparseFormat()) {
final double[] reference = ColGroupUtils.binaryDefRowRight(op, v, _colIndexes);
return ColGroupDDCFOR.create(_colIndexes, _dict, _data, getCachedCounts(), reference);
}
final ADictionary ret = _dict.binOpRight(op, v, _colIndexes);
return create(_colIndexes, ret, _data, getCachedCounts());
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
_data.write(out);
}
public static ColGroupDDC read(DataInput in) throws IOException {
IColIndex cols = ColIndexFactory.read(in);
ADictionary dict = DictionaryFactory.read(in);
AMapToData data = MapToFactory.readIn(in);
return new ColGroupDDC(cols, dict, data, null);
}
@Override
public long getExactSizeOnDisk() {
long ret = super.getExactSizeOnDisk();
ret += _data.getExactSizeOnDisk();
return ret;
}
@Override
public double getCost(ComputationCostEstimator e, int nRows) {
final int nVals = getNumValues();
final int nCols = getNumCols();
return e.getCost(nRows, nRows, nCols, nVals, _dict.getSparsity());
}
@Override
protected int numRowsToMultiply() {
return _data.size();
}
@Override
protected double computeMxx(double c, Builtin builtin) {
return _dict.aggregate(c, builtin);
}
@Override
protected void computeColMxx(double[] c, Builtin builtin) {
_dict.aggregateCols(c, builtin, _colIndexes);
}
@Override
public boolean containsValue(double pattern) {
return _dict.containsValue(pattern);
}
@Override
protected AColGroup allocateRightMultiplication(MatrixBlock right, IColIndex colIndexes, ADictionary preAgg) {
if(preAgg != null)
return create(colIndexes, preAgg, _data, getCachedCounts());
else
return null;
}
@Override
public AColGroup sliceRows(int rl, int ru) {
AMapToData sliceMap = _data.slice(rl, ru);
return new ColGroupDDC(_colIndexes, _dict, sliceMap, null);
}
@Override
protected AColGroup copyAndSet(IColIndex colIndexes, ADictionary newDictionary) {
return create(colIndexes, newDictionary, _data, getCachedCounts());
}
@Override
public AColGroup append(AColGroup g) {
if(g instanceof ColGroupDDC) {
if(g.getColIndices().equals(_colIndexes)) {
ColGroupDDC gDDC = (ColGroupDDC) g;
if(gDDC._dict.equals(_dict)) {
AMapToData nd = _data.append(gDDC._data);
return create(_colIndexes, _dict, nd, null);
}
else
LOG.warn("Not same Dictionaries therefore not appending DDC\n" + _dict + "\n\n" + gDDC._dict);
}
else
LOG.warn("Not same columns therefore not appending DDC\n" + _colIndexes + "\n\n" + g.getColIndices());
}
else
LOG.warn("Not DDC but " + g.getClass().getSimpleName() + ", therefore not appending DDC");
return null;
}
@Override
public AColGroup appendNInternal(AColGroup[] g) {
for(int i = 1; i < g.length; i++) {
if(!_colIndexes.equals(g[i]._colIndexes)) {
LOG.warn("Not same columns therefore not appending DDC\n" + _colIndexes + "\n\n" + g[i]._colIndexes);
return null;
}
if(!(g[i] instanceof ColGroupDDC)) {
LOG.warn("Not DDC but " + g[i].getClass().getSimpleName() + ", therefore not appending DDC");
return null;
}
final ColGroupDDC gDDC = (ColGroupDDC) g[i];
if(!gDDC._dict.equals(_dict)) {
LOG.warn("Not same Dictionaries therefore not appending DDC\n" + _dict + "\n\n" + gDDC._dict);
return null;
}
}
AMapToData nd = _data.appendN(Arrays.copyOf(g, g.length, IMapToDataGroup[].class));
return create(_colIndexes, _dict, nd, null);
}
@Override
public ICLAScheme getCompressionScheme() {
return DDCScheme.create(this);
}
@Override
public AColGroup recompress() {
return this;
}
@Override
public CompressedSizeInfoColGroup getCompressionInfo(int nRow) {
IEncode enc = getEncoding();
EstimationFactors ef = new EstimationFactors(getNumValues(), _data.size(), _data.size(), _dict.getSparsity());
return new CompressedSizeInfoColGroup(_colIndexes, ef, estimateInMemorySize(), getCompType(), enc);
}
@Override
public IEncode getEncoding() {
return EncodingFactory.create(_data);
}
@Override
protected AColGroup fixColIndexes(IColIndex newColIndex, int[] reordering) {
return ColGroupDDC.create(newColIndex, _dict.reorder(reordering), _data, getCachedCounts());
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(super.toString());
sb.append(String.format("\n%15s", "Data: "));
sb.append(_data);
return sb.toString();
}
}