blob: ae29535cb58b03f6a677fb011b7cedbed195f0bc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.compress;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.cocode.PlanningCoCoder;
import org.apache.sysds.runtime.compress.colgroup.ColGroup;
import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimatorFactory;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
import org.apache.sysds.runtime.compress.utils.DblArrayIntListHashMap;
import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing;
import org.apache.sysds.runtime.matrix.data.LibMatrixReorg;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.utils.DMLCompressionStatistics;
/**
* Factory pattern to construct a CompressedMatrixBlock.
*/
public class CompressedMatrixBlockFactory {
private static final Log LOG = LogFactory.getLog(CompressedMatrixBlockFactory.class.getName());
private static final CompressionSettings defaultCompressionSettings = new CompressionSettingsBuilder().create();
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb) {
// Default sequential execution of compression
return compress(mb, 1, defaultCompressionSettings);
}
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb,
CompressionSettings customSettings) {
return compress(mb, 1, customSettings);
}
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, int k) {
return compress(mb, k, defaultCompressionSettings);
}
/**
* The main method for compressing the input matrix.
*
* SAMPLE-BASED DECISIONS: Decisions such as testing if a column is amenable to bitmap compression or evaluating
* co-coding potentials are made based on a subset of the rows. For large data sets, sampling might take a
* significant amount of time. So, we generate only one sample and use it for the entire compression process.
*
* Once the compression plan is selected based on sampling, the plan is verified and decisions are overwritten by
* full estimates.
*
* @param mb The matrix block to compress
* @param k The number of threads used to execute the compression
* @param compSettings The Compression settings used
* @return A compressed matrix block.
*/
public static Pair<MatrixBlock, CompressionStatistics> compress(MatrixBlock mb, int k,
CompressionSettings compSettings) {
// Check for redundant compression
if(mb instanceof CompressedMatrixBlock) {
throw new DMLRuntimeException("Redundant compression, block already compressed.");
}
Timing time = new Timing(true);
CompressionStatistics _stats = new CompressionStatistics();
// Prepare basic meta data and deep copy / transpose input
int numRows = mb.getNumRows();
int numCols = mb.getNumColumns();
boolean sparse = mb.isInSparseFormat();
// Transpose the MatrixBlock if the TransposeInput flag is set.
// This gives better cache consciousness, at a small upfront cost.
MatrixBlock rawBlock = !compSettings.transposeInput ? new MatrixBlock(mb) : LibMatrixReorg
.transpose(mb, new MatrixBlock(numCols, numRows, sparse), k);
// Construct sample-based size estimator
CompressedSizeEstimator sizeEstimator = CompressedSizeEstimatorFactory.getSizeEstimator(rawBlock, compSettings);
// --------------------------------------------------
// PHASE 1: Classify columns by compression type
// Start by determining which columns are amenable to compression
// Classify columns according to ratio (size uncompressed / size compressed),
// where a column is compressible if ratio > 1.
CompressedSizeInfo sizeInfos = sizeEstimator.computeCompressedSizeInfos(k);
if(compSettings.investigateEstimate)
_stats.estimatedSizeCols = sizeInfos.memoryEstimate();
_stats.setNextTimePhase(time.stop());
if(LOG.isDebugEnabled()){
LOG.debug("Compression statistics:");
LOG.debug("--compression phase 1: " + _stats.getLastTimePhase());
}
if(sizeInfos.colsC.isEmpty()) {
LOG.info("Abort block compression because all columns are incompressible.");
return new ImmutablePair<>(new MatrixBlock().copyShallow(mb), _stats);
}
// --------------------------------------------------
// --------------------------------------------------
// PHASE 2: Grouping columns
// Divide the columns into column groups.
List<int[]> coCodeColGroups = PlanningCoCoder
.findCoCodesByPartitioning(sizeEstimator, sizeInfos, numRows, k, compSettings);
_stats.setNextTimePhase(time.stop());
if(LOG.isDebugEnabled()) {
LOG.debug("--compression phase 2: " + _stats.getLastTimePhase());
StringBuilder sb = new StringBuilder();
for(int[] group : coCodeColGroups)
sb.append(Arrays.toString(group));
LOG.debug(sb.toString());
}
// TODO: Make second estimate of memory usage if the ColGroups are as above?
// This should already be done inside the PlanningCoCoder, and therefore this information
// should be returned there, and not estimated twice.
// if(INVESTIGATE_ESTIMATES) {
// _stats.estimatedSizeColGroups = memoryEstimateIfColsAre(coCodeColGroups);
// }
// --------------------------------------------------
// --------------------------------------------------
// PHASE 3: Compress and correct sample-based decisions
ColGroup[] colGroups = ColGroupFactory
.compressColGroups(rawBlock, sizeInfos.compRatios, coCodeColGroups, compSettings, k);
// Make Compression happen!
CompressedMatrixBlock res = new CompressedMatrixBlock(mb);
List<ColGroup> colGroupList = ColGroupFactory.assignColumns(numCols, colGroups, rawBlock, compSettings);
res.allocateColGroupList(colGroupList);
_stats.setNextTimePhase(time.stop());
if(LOG.isDebugEnabled()) {
LOG.debug("Hash overlap count:" + DblArrayIntListHashMap.hashMissCount);
DblArrayIntListHashMap.hashMissCount = 0;
LOG.debug("--compression phase 3: " + _stats.getLastTimePhase());
}
// --------------------------------------------------
// --------------------------------------------------
// PHASE 4: Best-effort dictionary sharing for DDC1 single-col groups
// Dictionary dict = (!(compSettings.validCompressions.contains(CompressionType.DDC)) ||
// !(compSettings.allowSharedDDCDictionary)) ? null : createSharedDDC1Dictionary(colGroupList);
// if(dict != null) {
// applySharedDDC1Dictionary(colGroupList, dict);
// res._sharedDDC1Dict = true;
// }
_stats.setNextTimePhase(time.stop());
if(LOG.isDebugEnabled()) {
LOG.debug("--compression phase 4: " + _stats.getLastTimePhase());
}
// --------------------------------------------------
// --------------------------------------------------
// Phase 5: Cleanup
// The remaining columns are stored uncompressed as one big column group
_stats.size = res.estimateCompressedSizeInMemory();
_stats.originalSize = mb.estimateSizeInMemory();
_stats.ratio = _stats.originalSize / (double) _stats.size;
if(_stats.ratio < 1) {
LOG.info("Abort block compression because compression ratio is less than 1.");
return new ImmutablePair<>(new MatrixBlock().copyShallow(mb), _stats);
}
// Final cleanup (discard uncompressed block)
rawBlock.cleanupBlock(true, true);
res.cleanupBlock(true, true);
_stats.setNextTimePhase(time.stop());
_stats.setColGroupsCounts(colGroupList);
if(LOG.isDebugEnabled()) {
LOG.debug("--num col groups: " + colGroupList.size() + ", -- num input cols: " + numCols);
LOG.debug("--compression phase 5: " + _stats.getLastTimePhase());
LOG.debug("--col groups types " + _stats.getGroupsTypesString());
LOG.debug("--col groups sizes " + _stats.getGroupsSizesString());
LOG.debug("--compressed size: " + _stats.size);
LOG.debug("--compression ratio: " + _stats.ratio);
if(LOG.isTraceEnabled()) {
for(ColGroup colGroup : colGroupList) {
LOG.trace("--colGroups colIndexes : " + Arrays.toString(colGroup.getColIndices()));
LOG.trace("--colGroups type : " + colGroup.getClass().getSimpleName());
LOG.trace("--colGroups Values : " + Arrays.toString(colGroup.getValues()));
}
}
}
if (DMLScript.STATISTICS ){
DMLCompressionStatistics.addCompressionTimes(_stats.getTimeArrayList());
}
return new ImmutablePair<>(res, _stats);
// --------------------------------------------------
}
/**
* Dictionary sharing between DDC ColGroups.
*
* @param colGroups The List of all ColGroups.
* @return the shared value list for the DDC ColGroups.
*/
// private static Dictionary createSharedDDC1Dictionary(List<ColGroup> colGroups) {
// // create joint dictionary
// HashSet<Double> vals = new HashSet<>();
// HashMap<Integer, Double> mins = new HashMap<>();
// HashMap<Integer, Double> maxs = new HashMap<>();
// int numDDC1 = 0;
// for(final ColGroup grp : colGroups)
// if(grp.getNumCols() == 1 && grp instanceof ColGroupDDC1) {
// final ColGroupDDC1 grpDDC1 = (ColGroupDDC1) grp;
// final double[] values = grpDDC1.getValues();
// double min = Double.POSITIVE_INFINITY;
// double max = Double.NEGATIVE_INFINITY;
// for(int i = 0; i < values.length; i++) {
// vals.add(values[i]);
// min = Math.min(min, values[i]);
// max = Math.max(max, values[i]);
// }
// mins.put(grpDDC1.getColIndex(0), min);
// maxs.put(grpDDC1.getColIndex(0), max);
// numDDC1++;
// }
// // abort shared dictionary creation if empty or too large
// int maxSize = vals.contains(0d) ? 256 : 255;
// if(numDDC1 < 2 || vals.size() > maxSize)
// return null;
// // build consolidated shared dictionary
// double[] values = vals.stream().mapToDouble(Double::doubleValue).toArray();
// int[] colIndexes = new int[numDDC1];
// double[] extrema = new double[2 * numDDC1];
// int pos = 0;
// for(Entry<Integer, Double> e : mins.entrySet()) {
// colIndexes[pos] = e.getKey();
// extrema[2 * pos] = e.getValue();
// extrema[2 * pos + 1] = maxs.get(e.getKey());
// pos++;
// }
// return new DictionaryShared(values, colIndexes, extrema);
// }
// private static void applySharedDDC1Dictionary(List<ColGroup> colGroups, Dictionary dict) {
// // create joint mapping table
// HashMap<Double, Integer> map = new HashMap<>();
// double[] values = dict.getValues();
// for(int i = 0; i < values.length; i++)
// map.put(values[i], i);
// // recode data of all relevant DDC1 groups
// for(ColGroup grp : colGroups)
// if(grp.getNumCols() == 1 && grp instanceof ColGroupDDC1) {
// ColGroupDDC1 grpDDC1 = (ColGroupDDC1) grp;
// grpDDC1.recodeData(map);
// grpDDC1.setDictionary(dict);
// }
// }
}