blob: 46a38f90078c198828ea9efcd2027d38c95772ec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.compress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.cocode.PlanningCoCoder;
import org.apache.sysds.runtime.compress.colgroup.ColGroup;
import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
import org.apache.sysds.runtime.compress.colgroup.ColGroupUncompressed;
import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimatorFactory;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
import org.apache.sysds.runtime.compress.utils.DblArrayIntListHashMap;
import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing;
import org.apache.sysds.runtime.matrix.data.LibMatrixReorg;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.utils.DMLCompressionStatistics;
/**
* Factory pattern to construct a CompressedMatrixBlock.
*/
public class CompressedMatrixBlockFactory {
private static final Log LOG = LogFactory.getLog(CompressedMatrixBlockFactory.class.getName());
private Timing time = new Timing(true);
private CompressionStatistics _stats = new CompressionStatistics();
private MatrixBlock mb;
private MatrixBlock original;
private int k;
private CompressionSettings compSettings;
private CompressedMatrixBlock res = null;
private int phase = 0;
private List<int[]> coCodeColGroups;
private CompressedMatrixBlockFactory(MatrixBlock mb, int k, CompressionSettings compSettings) {
this.mb = mb;
this.k = k;
this.compSettings = compSettings;
}
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb) {
// Default sequential execution of compression
return compress(mb, 1, new CompressionSettingsBuilder().create());
}
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb,
CompressionSettings customSettings) {
return compress(mb, 1, customSettings);
}
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, int k) {
return compress(mb, k, new CompressionSettingsBuilder().create());
}
/**
* The main method for compressing the input matrix.
*
* SAMPLE-BASED DECISIONS: Decisions such as testing if a column is amenable to bitmap compression or evaluating
* co-coding potentials are made based on a subset of the rows. For large data sets, sampling might take a
* significant amount of time. So, we generate only one sample and use it for the entire compression process.
*
* Once the compression plan is selected based on sampling, the plan is verified and decisions are overwritten by
* full estimates.
*
* @param mb The matrix block to compress
* @param k The number of threads used to execute the compression
* @param compSettings The Compression settings used
* @return A compressed matrix block.
*/
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, int k,
CompressionSettings compSettings) {
CompressedMatrixBlockFactory cmbf = new CompressedMatrixBlockFactory(mb, k, compSettings);
return cmbf.compressMatrix();
}
private Pair<MatrixBlock, CompressionStatistics> compressMatrix() {
// Check for redundant compression
if(mb instanceof CompressedMatrixBlock) {
throw new DMLRuntimeException("Redundant compression, block already compressed.");
}
original = mb;
mb = new MatrixBlock().copyShallow(mb);
classifyPhase();
if(coCodeColGroups == null)
return abortCompression();
transposePhase();
compressPhase();
cleanupPhase();
if(res == null)
return abortCompression();
return new ImmutablePair<>(res, _stats);
}
private void classifyPhase() {
CompressedSizeEstimator sizeEstimator = CompressedSizeEstimatorFactory.getSizeEstimator(mb, compSettings);
CompressedSizeInfo sizeInfos = sizeEstimator.computeCompressedSizeInfos(k);
if(compSettings.investigateEstimate)
_stats.estimatedSizeCols = sizeInfos.memoryEstimate();
logPhase();
if(sizeInfos.isCompressible())
coCodePhase(sizeEstimator, sizeInfos, mb.getNumRows());
}
private void coCodePhase(CompressedSizeEstimator sizeEstimator, CompressedSizeInfo sizeInfos, int numRows) {
coCodeColGroups = PlanningCoCoder.findCoCodesByPartitioning(sizeEstimator, sizeInfos, numRows, k, compSettings);
logPhase();
}
private void transposePhase() {
boolean sparse = mb.isInSparseFormat();
transposeHeuristics();
mb = compSettings.transposed ? LibMatrixReorg.transpose(mb,
new MatrixBlock(mb.getNumColumns(), mb.getNumRows(), sparse),
k) : new MatrixBlock(mb.getNumRows(), mb.getNumColumns(), sparse).copyShallow(mb);
logPhase();
}
private void transposeHeuristics() {
switch(compSettings.transposeInput) {
case "true":
compSettings.transposed = true;
break;
case "false":
compSettings.transposed = false;
break;
default:
if(original.isInSparseFormat()) {
boolean isAboveRowNumbers = mb.getNumRows() > 1000000;
boolean isAboveThreadToColumnRatio = coCodeColGroups.size() > mb.getNumColumns() / 2;
compSettings.transposed = isAboveRowNumbers || isAboveThreadToColumnRatio;
}
else
compSettings.transposed = false;
}
}
private void compressPhase() {
res = new CompressedMatrixBlock(original);
ColGroup[] colGroups = ColGroupFactory.compressColGroups(mb, null, coCodeColGroups, compSettings, k);
List<ColGroup> colGroupList = assignColumns(original.getNumColumns(), colGroups, mb, compSettings);
res.allocateColGroupList(colGroupList);
logPhase();
}
private void cleanupPhase() {
res.cleanupBlock(true, true);
mb.cleanupBlock(true, true);
_stats.size = res.estimateCompressedSizeInMemory();
_stats.originalSize = mb.estimateSizeInMemory();
_stats.ratio = _stats.originalSize / (double) _stats.size;
if(_stats.ratio < 1) {
LOG.info("--compressed size: " + _stats.size);
LOG.info("--compression ratio: " + _stats.ratio);
LOG.info("Abort block compression because compression ratio is less than 1.");
res = null;
return;
}
_stats.setColGroupsCounts(res.getColGroups());
logPhase();
}
private Pair<MatrixBlock, CompressionStatistics> abortCompression() {
LOG.warn("Compression aborted at phase: " + phase);
return new ImmutablePair<>(original, _stats);
}
private void logPhase() {
_stats.setNextTimePhase(time.stop());
// if(DMLScript.STATISTICS) {
DMLCompressionStatistics.addCompressionTime(_stats.getLastTimePhase(), phase);
// }
if(LOG.isDebugEnabled()) {
switch(phase) {
case 0:
LOG.debug("--compression phase " + phase + " Classify : " + _stats.getLastTimePhase());
break;
case 1:
LOG.debug("--compression phase " + phase + " Grouping : " + _stats.getLastTimePhase());
break;
case 2:
LOG.debug("--compression phase " + phase + " Transpose : " + _stats.getLastTimePhase());
break;
case 3:
LOG.debug("--compression phase " + phase + " Compress : " + _stats.getLastTimePhase());
LOG.debug("--compression Hash collisions:" + DblArrayIntListHashMap.hashMissCount);
DblArrayIntListHashMap.hashMissCount = 0;
break;
// case 4:
// LOG.debug("--compression phase " + phase++ + " Share : " + _stats.getLastTimePhase());
// break;
case 4:
LOG.debug("--num col groups: " + res.getColGroups().size());
LOG.debug("--compression phase " + phase + " Cleanup : " + _stats.getLastTimePhase());
LOG.debug("--col groups types " + _stats.getGroupsTypesString());
LOG.debug("--col groups sizes " + _stats.getGroupsSizesString());
LOG.debug("--compressed size: " + _stats.size);
LOG.debug("--compression ratio: " + _stats.ratio);
int[] lengths = new int[res.getColGroups().size()];
int i = 0;
for(ColGroup colGroup : res.getColGroups()) {
if(colGroup.getValues() != null)
lengths[i++] = colGroup.getValues().length / colGroup.getColIndices().length;
}
LOG.debug("--compressed colGroup dictionary sizes: " + Arrays.toString(lengths));
if(LOG.isTraceEnabled()) {
for(ColGroup colGroup : res.getColGroups()) {
LOG.trace("--colGroups colIndexes : " + Arrays.toString(colGroup.getColIndices()));
LOG.trace("--colGroups type : " + colGroup.getClass().getSimpleName());
}
}
default:
}
}
phase++;
}
private List<ColGroup> assignColumns(int numCols, ColGroup[] colGroups, MatrixBlock rawBlock,
CompressionSettings compSettings) {
// Find the columns that are not assigned yet, and assign them to uncompressed.
List<ColGroup> _colGroups = new ArrayList<>();
HashSet<Integer> remainingCols = seq(0, numCols - 1, 1);
for(int j = 0; j < colGroups.length; j++) {
if(colGroups[j] != null) {
for(int col : colGroups[j].getColIndices())
remainingCols.remove(col);
_colGroups.add(colGroups[j]);
}
}
if(!remainingCols.isEmpty()) {
int[] list = remainingCols.stream().mapToInt(i -> i).toArray();
LOG.warn("UnCompressible Columns: " + Arrays.toString(list));
ColGroupUncompressed ucGroup = new ColGroupUncompressed(list, original, compSettings.transposed);
_colGroups.add(ucGroup);
}
return _colGroups;
}
private static HashSet<Integer> seq(int from, int to, int incr) {
HashSet<Integer> ret = new HashSet<>();
for(int i = from; i <= to; i += incr)
ret.add(i);
return ret;
}
}