blob: 5074a695df37ff18255981a7f518a1311fa7af45 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.compress;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.compress.cocode.CoCoderFactory;
import org.apache.sysds.runtime.compress.colgroup.AColGroup;
import org.apache.sysds.runtime.compress.colgroup.AColGroupValue;
import org.apache.sysds.runtime.compress.colgroup.ColGroupConst;
import org.apache.sysds.runtime.compress.colgroup.ColGroupEmpty;
import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed;
import org.apache.sysds.runtime.compress.cost.ACostEstimate;
import org.apache.sysds.runtime.compress.cost.CostEstimatorBuilder;
import org.apache.sysds.runtime.compress.cost.CostEstimatorFactory;
import org.apache.sysds.runtime.compress.cost.InstructionTypeCounter;
import org.apache.sysds.runtime.compress.cost.MemoryCostEstimator;
import org.apache.sysds.runtime.compress.estim.AComEst;
import org.apache.sysds.runtime.compress.estim.ComEstFactory;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
import org.apache.sysds.runtime.compress.workload.WTreeRoot;
import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
import org.apache.sysds.runtime.controlprogram.context.ExecutionContext;
import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing;
import org.apache.sysds.runtime.matrix.data.LibMatrixReorg;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.utils.DMLCompressionStatistics;
/**
* Factory pattern to compress a Matrix Block into a CompressedMatrixBlock.
*/
public class CompressedMatrixBlockFactory {
private static final Log LOG = LogFactory.getLog(CompressedMatrixBlockFactory.class.getName());
/** Timing object to measure the time of each phase in the compression */
private final Timing time = new Timing(true);
/** Compression statistics gathered throughout the compression */
private final CompressionStatistics _stats = new CompressionStatistics();
/** Parallelization degree */
private final int k;
/** Compression settings used for this compression */
private final CompressionSettings compSettings;
/** The main cost estimator used for the compression */
private final ACostEstimate costEstimator;
/** Time stamp of last phase */
private double lastPhase;
/** Pointer to the original matrix Block that is about to be compressed. */
private MatrixBlock mb;
/** The resulting compressed matrix */
private CompressedMatrixBlock res;
/** The current Phase ID */
private int phase = 0;
/** Object to extract statistics from columns to make decisions based on */
private AComEst informationExtractor;
/** Compression information gathered through the sampling, used for the actual compression decided */
private CompressedSizeInfo compressionGroups;
private CompressedMatrixBlockFactory(MatrixBlock mb, int k, CompressionSettingsBuilder compSettings,
ACostEstimate costEstimator) {
this(mb, k, compSettings.create(), costEstimator);
}
private CompressedMatrixBlockFactory(MatrixBlock mb, int k, CompressionSettings compSettings,
ACostEstimate costEstimator) {
this.mb = mb;
this.k = k;
this.compSettings = compSettings;
this.costEstimator = costEstimator;
}
/**
* Default sequential compression with no parallelization
*
* @param mb The matrixBlock to compress
* @return A Pair of a Matrix Block and Compression Statistics.
*/
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb) {
return compress(mb, 1, new CompressionSettingsBuilder(), (WTreeRoot) null);
}
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, WTreeRoot root) {
return compress(mb, 1, new CompressionSettingsBuilder(), root);
}
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, CostEstimatorBuilder csb) {
return compress(mb, 1, new CompressionSettingsBuilder(), csb);
}
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, InstructionTypeCounter ins) {
if(ins == null)
return compress(mb, 1, new CompressionSettingsBuilder());
return compress(mb, 1, new CompressionSettingsBuilder(), new CostEstimatorBuilder(ins));
}
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb,
CompressionSettingsBuilder customSettings) {
return compress(mb, 1, customSettings, (WTreeRoot) null);
}
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, int k) {
return compress(mb, k, new CompressionSettingsBuilder(), (WTreeRoot) null);
}
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, int k, WTreeRoot root) {
return compress(mb, k, new CompressionSettingsBuilder(), root);
}
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, int k, CostEstimatorBuilder csb) {
return compress(mb, k, new CompressionSettingsBuilder(), csb);
}
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, int k, InstructionTypeCounter ins) {
if(ins == null)
return compress(mb, 1, new CompressionSettingsBuilder());
return compress(mb, k, new CompressionSettingsBuilder(), new CostEstimatorBuilder(ins));
}
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, ACostEstimate costEstimator) {
return compress(mb, 1, new CompressionSettingsBuilder(), costEstimator);
}
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, int k, ACostEstimate costEstimator) {
return compress(mb, k, new CompressionSettingsBuilder(), costEstimator);
}
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, int k,
CompressionSettingsBuilder compSettings) {
return compress(mb, k, compSettings, (WTreeRoot) null);
}
public static void compressAsync(ExecutionContext ec, String varName) {
compressAsync(ec, varName, null);
}
public static void compressAsync(ExecutionContext ec, String varName, InstructionTypeCounter ins) {
LOG.debug("Compressing Async");
CompletableFuture.runAsync(() -> {
// method call or code to be asynch.
CacheableData<?> data = ec.getCacheableData(varName);
if(data instanceof MatrixObject) {
MatrixObject mo = (MatrixObject) data;
MatrixBlock mb = mo.acquireReadAndRelease();
MatrixBlock mbc = CompressedMatrixBlockFactory.compress(mo.acquireReadAndRelease(), ins).getLeft();
if(mbc instanceof CompressedMatrixBlock) {
ExecutionContext.createCacheableData(mb);
mo.acquireModify(mbc);
mo.release();
}
}
});
}
/**
* The main method for compressing the input matrix.
*
*
* @param mb The matrix block to compress
* @param k The number of threads used to execute the compression
* @param compSettings The Compression settings used
* @param root The root instruction compressed, and used for calculating the computation cost of the
* compression
* @return A pair of an possibly compressed matrix block and compression statistics.
*/
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, int k,
CompressionSettingsBuilder compSettings, WTreeRoot root) {
CompressionSettings cs = compSettings.create();
ACostEstimate ice;
if(root == null)
ice = CostEstimatorFactory.create(cs, null, mb.getNumRows(), mb.getNumColumns(), mb.getSparsity());
else {
CostEstimatorBuilder csb = new CostEstimatorBuilder(root);
ice = CostEstimatorFactory.create(cs, csb, mb.getNumRows(), mb.getNumColumns(), mb.getSparsity());
}
CompressedMatrixBlockFactory cmbf = new CompressedMatrixBlockFactory(mb, k, cs, ice);
return cmbf.compressMatrix();
}
/**
* The main method for compressing the input matrix.
*
* @param mb The matrix block to compress
* @param k The number of threads used to execute the compression
* @param compSettings The Compression settings used
* @param csb The cost estimation builder
* @return A pair of an possibly compressed matrix block and compression statistics.
*/
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, int k,
CompressionSettingsBuilder compSettings, CostEstimatorBuilder csb) {
CompressionSettings cs = compSettings.create();
ACostEstimate ice = CostEstimatorFactory.create(cs, csb, mb.getNumRows(), mb.getNumColumns(), mb.getSparsity());
CompressedMatrixBlockFactory cmbf = new CompressedMatrixBlockFactory(mb, k, cs, ice);
return cmbf.compressMatrix();
}
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, int k,
CompressionSettingsBuilder compSettings, ACostEstimate costEstimator) {
CompressedMatrixBlockFactory cmbf = new CompressedMatrixBlockFactory(mb, k, compSettings, costEstimator);
return cmbf.compressMatrix();
}
/**
* Generate a CompressedMatrixBlock Object that contains a single uncompressed matrix block column group. Note this
* could be an empty colgroup if the input is empty.
*
* @param mb The matrix block to be contained in the uncompressed matrix block column,
* @return a CompressedMatrixBlock
*/
public static CompressedMatrixBlock genUncompressedCompressedMatrixBlock(MatrixBlock mb) {
CompressedMatrixBlock ret = new CompressedMatrixBlock(mb.getNumRows(), mb.getNumColumns());
AColGroup cg = ColGroupUncompressed.create(mb);
ret.allocateColGroup(cg);
ret.setNonZeros(mb.getNonZeros());
return ret;
}
/**
* Method for constructing a compressed matrix out of an constant input.
*
* Since the input is a constant value it is trivially compressable, therefore we skip the entire compression
* planning and directly return a compressed constant matrix
*
* @param numRows The number of Rows in the matrix
* @param numCols The number of Columns in the matrix
* @param value The value contained in the matrix
* @return The Compressed Constant matrix.
*/
public static CompressedMatrixBlock createConstant(int numRows, int numCols, double value) {
CompressedMatrixBlock block = new CompressedMatrixBlock(numRows, numCols);
AColGroup cg = ColGroupConst.create(numCols, value);
block.allocateColGroup(cg);
block.recomputeNonZeros();
if(block.getNumRows() <= 0) // NCols is already checked
throw new DMLCompressionException("Invalid size of allocated constant compressed matrix block");
return block;
}
private Pair<MatrixBlock, CompressionStatistics> compressMatrix() {
if(mb.getNonZeros() < 0)
throw new DMLCompressionException("Invalid to compress matrices with unknown nonZeros");
else if(mb instanceof CompressedMatrixBlock && ((CompressedMatrixBlock) mb).isOverlapping()) {
LOG.warn("Unsupported recompression of overlapping compression");
return new ImmutablePair<>(mb, null);
}
_stats.denseSize = MatrixBlock.estimateSizeInMemory(mb.getNumRows(), mb.getNumColumns(), 1.0);
_stats.sparseSize = MatrixBlock.estimateSizeSparseInMemory(mb.getNumRows(), mb.getNumColumns(), mb.getSparsity());
_stats.originalSize = mb.getInMemorySize();
_stats.originalCost = costEstimator.getCost(mb);
if(mb.isEmpty()) // empty input return empty compression
return createEmpty();
res = new CompressedMatrixBlock(mb); // copy metadata and allocate soft reference
classifyPhase();
if(compressionGroups == null)
return abortCompression();
// clear extra data from analysis
compressionGroups.clearMaps();
informationExtractor.clearNNZ();
transposePhase();
compressPhase();
finalizePhase();
if(res == null)
return abortCompression();
return new ImmutablePair<>(res, _stats);
}
private void classifyPhase() {
// Create the extractor for column statistics
informationExtractor = ComEstFactory.createEstimator(mb, compSettings, k);
// Compute the individual columns cost information
compressionGroups = informationExtractor.computeCompressedSizeInfos(k);
if(LOG.isTraceEnabled()) {
LOG.trace("Logging all individual columns estimated cost:");
for(CompressedSizeInfoColGroup g : compressionGroups.getInfo())
LOG.trace(String.format("Cost: %8.0f Size: %16.0f %15s", costEstimator.getCost(g), g.getMinSize(),
g.getColumns()));
}
_stats.estimatedSizeCols = compressionGroups.memoryEstimate();
_stats.estimatedCostCols = costEstimator.getCost(compressionGroups);
logPhase();
// final int nRows = mb.getNumRows();
final int nCols = mb.getNumColumns();
// Assume the scaling of cocoding is at maximum square root good relative to number of columns.
final double scale = Math.sqrt(nCols);
final double threshold = _stats.estimatedCostCols / scale;
if(threshold < _stats.originalCost) {
if(nCols > 1)
coCodePhase();
else // LOG a short cocode phase (since there is one column we don't cocode)
logPhase();
}
else {
// abort compression
compressionGroups = null;
if(LOG.isInfoEnabled()) {
LOG.info("Aborting before co-code, because the compression looks bad");
LOG.info("Threshold was set to : " + threshold + " but it was above original " + _stats.originalCost);
LOG.info("Original size : " + _stats.originalSize);
LOG.info("single col size : " + _stats.estimatedSizeCols);
if(!(costEstimator instanceof MemoryCostEstimator)) {
LOG.info("original cost : " + _stats.originalCost);
LOG.info("single col cost : " + _stats.estimatedCostCols);
}
}
}
}
private void coCodePhase() {
compressionGroups = CoCoderFactory.findCoCodesByPartitioning(informationExtractor, compressionGroups, k,
costEstimator, compSettings);
_stats.estimatedSizeCoCoded = compressionGroups.memoryEstimate();
_stats.estimatedCostCoCoded = costEstimator.getCost(compressionGroups);
logPhase();
// if cocode is estimated larger than uncompressed abort compression.
if(_stats.estimatedCostCoCoded > _stats.originalCost) {
// abort compression
compressionGroups = null;
if(LOG.isInfoEnabled()) {
LOG.info("Aborting after co-code, because the compression looks bad");
LOG.info("co-code size : " + _stats.estimatedSizeCoCoded);
LOG.info("original size : " + _stats.originalSize);
if(!(costEstimator instanceof MemoryCostEstimator)) {
LOG.info("original cost : " + _stats.originalCost);
LOG.info("single col cost : " + _stats.estimatedCostCols);
LOG.info("co-code cost : " + _stats.estimatedCostCoCoded);
}
}
}
}
private void transposePhase() {
final boolean haveMemory = Runtime.getRuntime().freeMemory() - (mb.estimateSizeInMemory() * 2) > 0;
if(!compSettings.transposed && haveMemory) {
transposeHeuristics();
if(compSettings.transposed) {
boolean sparse = mb.isInSparseFormat();
mb = LibMatrixReorg.transpose(mb, new MatrixBlock(mb.getNumColumns(), mb.getNumRows(), sparse), k, true);
mb.evalSparseFormatInMemory();
}
}
logPhase();
}
private void transposeHeuristics() {
switch(compSettings.transposeInput) {
case "true":
compSettings.transposed = true;
break;
case "false":
compSettings.transposed = false;
break;
default:
if(mb.isInSparseFormat()) {
if(mb.getNumColumns() > 10000)
// many sparse columns we have to...
compSettings.transposed = true;
else if(mb.getNonZeros() < 1000)
// low nnz trivial to transpose
compSettings.transposed = true;
else {
// is enough rows to make it usable
boolean isAboveRowNumbers = mb.getNumRows() > 500000;
// Make sure that it is not more efficient to extract the rows.
boolean isAboveThreadToColumnRatio = compressionGroups.getNumberColGroups() > mb.getNumColumns() / 30;
compSettings.transposed = isAboveRowNumbers && isAboveThreadToColumnRatio;
}
}
else
compSettings.transposed = false;
}
}
private void compressPhase() {
List<AColGroup> c = ColGroupFactory.compressColGroups(mb, compressionGroups, compSettings, costEstimator, k);
res.allocateColGroupList(c);
_stats.compressedInitialSize = res.getInMemorySize();
logPhase();
}
private void finalizePhase() {
res.cleanupBlock(true, true);
_stats.compressedSize = res.getInMemorySize();
_stats.compressedCost = costEstimator.getCost(res.getColGroups(), res.getNumRows());
final double ratio = _stats.getRatio();
final double denseRatio = _stats.getDenseRatio();
_stats.setColGroupsCounts(res.getColGroups());
if(ratio < 1 && denseRatio < 100.0) {
LOG.info("--dense size: " + _stats.denseSize);
LOG.info("--original size: " + _stats.originalSize);
LOG.info("--compressed size: " + _stats.compressedSize);
LOG.info("--compression ratio: " + ratio);
LOG.debug("--col groups types " + _stats.getGroupsTypesString());
LOG.debug("--col groups sizes " + _stats.getGroupsSizesString());
logLengths();
LOG.info("Abort block compression because compression ratio is less than 1.");
res = null;
setNextTimePhase(time.stop());
DMLCompressionStatistics.addCompressionTime(getLastTimePhase(), phase);
return;
}
if(compSettings.isInSparkInstruction)
res.clearSoftReferenceToDecompressed();
res.setNonZeros(mb.getNonZeros());
logPhase();
}
private Pair<MatrixBlock, CompressionStatistics> abortCompression() {
LOG.warn("Compression aborted at phase: " + phase);
return new ImmutablePair<>(mb, _stats);
}
private void logPhase() {
setNextTimePhase(time.stop());
DMLCompressionStatistics.addCompressionTime(getLastTimePhase(), phase);
if(LOG.isDebugEnabled()) {
if(compSettings.isInSparkInstruction) {
if(phase == 4)
LOG.debug(_stats);
}
else {
switch(phase) {
case 0:
LOG.debug("--Seed used for comp : " + compSettings.seed);
LOG.debug("--compression phase " + phase + " Classify : " + getLastTimePhase());
LOG.debug("--Individual Columns Estimated Compression: " + _stats.estimatedSizeCols);
if(mb instanceof CompressedMatrixBlock) {
LOG.debug("--Recompressing already compressed MatrixBlock");
}
break;
case 1:
LOG.debug("--compression phase " + phase + " Grouping : " + getLastTimePhase());
LOG.debug("Grouping using: " + compSettings.columnPartitioner);
LOG.debug("Cost Calculated using: " + costEstimator);
LOG.debug("--Cocoded Columns estimated Compression:" + _stats.estimatedSizeCoCoded);
if(compressionGroups.getInfo().size() < 1000) {
LOG.debug("--Cocoded Columns estimated nr distinct:" + compressionGroups.getEstimatedDistinct());
LOG.debug("--Cocoded Columns nr columns :" + compressionGroups.getNrColumnsString());
}
else {
LOG.debug(
"--CoCoded produce many columns but the first says:\n" + compressionGroups.getInfo().get(0));
}
break;
case 2:
LOG.debug("--compression phase " + phase + " Transpose : " + getLastTimePhase());
LOG.debug("Did transpose: " + compSettings.transposed);
break;
case 3:
LOG.debug("--compression phase " + phase + " Compress : " + getLastTimePhase());
LOG.debug("--compressed initial actual size:" + _stats.compressedInitialSize);
break;
case 4:
default:
LOG.debug("--num col groups: " + res.getColGroups().size());
LOG.debug("--compression phase " + phase + " Cleanup : " + getLastTimePhase());
LOG.debug("--col groups types " + _stats.getGroupsTypesString());
LOG.debug("--col groups sizes " + _stats.getGroupsSizesString());
LOG.debug("--input was compressed " + (mb instanceof CompressedMatrixBlock));
LOG.debug(String.format("--dense size: %16d", _stats.denseSize));
LOG.debug(String.format("--sparse size: %16d", _stats.sparseSize));
LOG.debug(String.format("--original size: %16d", _stats.originalSize));
LOG.debug(String.format("--compressed size: %16d", _stats.compressedSize));
LOG.debug(String.format("--compression ratio: %4.3f", _stats.getRatio()));
LOG.debug(String.format("--Dense ratio: %4.3f", _stats.getDenseRatio()));
if(!(costEstimator instanceof MemoryCostEstimator)) {
LOG.debug(String.format("--original cost: %5.2E", _stats.originalCost));
LOG.debug(String.format("--single col cost: %5.2E", _stats.estimatedCostCols));
LOG.debug(String.format("--cocode cost: %5.2E", _stats.estimatedCostCoCoded));
LOG.debug(String.format("--actual cost: %5.2E", _stats.compressedCost));
LOG.debug(
String.format("--relative cost: %1.4f", (_stats.compressedCost / _stats.originalCost)));
}
logLengths();
}
}
}
phase++;
}
private void logLengths() {
if(compressionGroups != null && compressionGroups.getInfo().size() < 1000) {
int[] lengths = new int[res.getColGroups().size()];
int i = 0;
for(AColGroup colGroup : res.getColGroups())
lengths[i++] = colGroup.getNumValues();
LOG.debug("--compressed colGroup dictionary sizes: " + Arrays.toString(lengths));
LOG.debug("--compressed colGroup nr columns : " + constructNrColumnString(res.getColGroups()));
}
if(LOG.isTraceEnabled()) {
for(AColGroup colGroup : res.getColGroups()) {
if(colGroup.estimateInMemorySize() < 1000)
LOG.trace(colGroup);
else {
LOG.trace(
"--colGroups type : " + colGroup.getClass().getSimpleName() + " size: "
+ colGroup.estimateInMemorySize()
+ ((colGroup instanceof AColGroupValue) ? " numValues :"
+ ((AColGroupValue) colGroup).getNumValues() : "")
+ " colIndexes : " + colGroup.getColIndices());
}
}
}
}
private void setNextTimePhase(double time) {
lastPhase = time;
}
private double getLastTimePhase() {
return lastPhase;
}
private Pair<MatrixBlock, CompressionStatistics> createEmpty() {
LOG.info("Empty input to compress, returning a compressed Matrix block with empty column group");
res = new CompressedMatrixBlock(mb.getNumRows(), mb.getNumColumns());
ColGroupEmpty cg = ColGroupEmpty.create(mb.getNumColumns());
res.allocateColGroup(cg);
res.setNonZeros(0);
_stats.compressedSize = res.getInMemorySize();
_stats.compressedCost = costEstimator.getCost(res.getColGroups(), res.getNumRows());
_stats.setColGroupsCounts(res.getColGroups());
phase = 4;
logPhase();
return new ImmutablePair<>(res, _stats);
}
private static String constructNrColumnString(List<AColGroup> cg) {
StringBuilder sb = new StringBuilder();
sb.append("[");
sb.append(cg.get(0).getNumCols());
for(int id = 1; id < cg.size(); id++)
sb.append(", " + cg.get(id).getNumCols());
sb.append("]");
return sb.toString();
}
}