blob: 77e94bcf3297a0d56220a2643b2ea2f45d547f84 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.compress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.cocode.PlanningCoCoder;
import org.apache.sysds.runtime.compress.colgroup.ColGroup;
import org.apache.sysds.runtime.compress.colgroup.ColGroup.CompressionType;
import org.apache.sysds.runtime.compress.colgroup.ColGroupDDC1;
import org.apache.sysds.runtime.compress.colgroup.ColGroupFactory;
import org.apache.sysds.runtime.compress.colgroup.Dictionary;
import org.apache.sysds.runtime.compress.colgroup.DictionaryShared;
import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimatorFactory;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing;
import org.apache.sysds.runtime.matrix.data.LibMatrixReorg;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
/**
* Factory pattern to construct a CompressedMatrixBlock.
*/
public class CompressedMatrixBlockFactory {
private static final Log LOG = LogFactory.getLog(CompressedMatrixBlockFactory.class.getName());
private static final CompressionSettings defaultCompressionSettings = new CompressionSettingsBuilder().create();
public static MatrixBlock compress(MatrixBlock mb) {
// Default sequential execution of compression
return compress(mb, 1, defaultCompressionSettings);
}
public static MatrixBlock compress(MatrixBlock mb, CompressionSettings customSettings) {
return compress(mb, 1, customSettings);
}
public static MatrixBlock compress(MatrixBlock mb, int k) {
return compress(mb, k, defaultCompressionSettings);
}
/**
* The main method for compressing the input matrix.
*
* SAMPLE-BASED DECISIONS: Decisions such as testing if a column is amenable to bitmap compression or evaluating
* co-coding potentials are made based on a subset of the rows. For large data sets, sampling might take a
* significant amount of time. So, we generate only one sample and use it for the entire compression process.
*
* Once the compression plan is selected based on sampling, the plan is verified and decisions are overwritten by
* full estimates.
*
* @param mb The matrix block to compress
* @param k The number of threads used to execute the compression
* @param compSettings The Compression settings used
* @return A compressed matrix block.
*/
public static MatrixBlock compress(MatrixBlock mb, int k, CompressionSettings compSettings) {
// Check for redundant compression
if(mb instanceof CompressedMatrixBlock && ((CompressedMatrixBlock) mb).isCompressed()) {
throw new DMLRuntimeException("Redundant compression, block already compressed.");
}
Timing time = new Timing(true);
CompressionStatistics _stats = new CompressionStatistics();
// Prepare basic meta data and deep copy / transpose input
int numRows = mb.getNumRows();
int numCols = mb.getNumColumns();
boolean sparse = mb.isInSparseFormat();
// Transpose the MatrixBlock if the TransposeInput flag is set.
// This gives better cache consciousness, at a small upfront cost.
MatrixBlock rawBlock = !compSettings.transposeInput ? new MatrixBlock(mb) : LibMatrixReorg
.transpose(mb, new MatrixBlock(numCols, numRows, sparse), k);
// Construct sample-based size estimator
CompressedSizeEstimator sizeEstimator = CompressedSizeEstimatorFactory.getSizeEstimator(rawBlock, compSettings);
// --------------------------------------------------
// PHASE 1: Classify columns by compression type
// Start by determining which columns are amenable to compression
// Classify columns according to ratio (size uncompressed / size compressed),
// where a column is compressible if ratio > 1.
CompressedSizeInfo sizeInfos = sizeEstimator.computeCompressedSizeInfos(k);
if(compSettings.investigateEstimate)
_stats.estimatedSizeCols = sizeInfos.memoryEstimate();
_stats.setNextTimePhase(time.stop());
LOG.debug("Compression statistics:");
LOG.debug("--compression phase 1: " + _stats.getLastTimePhase());
if(sizeInfos.colsC.isEmpty()) {
LOG.warn("Abort block compression because all columns are incompressible.");
return new MatrixBlock().copyShallow(mb);
}
// --------------------------------------------------
// --------------------------------------------------
// PHASE 2: Grouping columns
// Divide the columns into column groups.
List<int[]> coCodeColGroups = PlanningCoCoder.findCocodesByPartitioning(sizeEstimator, sizeInfos, numRows, k);
_stats.setNextTimePhase(time.stop());
LOG.debug("--compression phase 2: " + _stats.getLastTimePhase());
// TODO: Make second estimate of memory usage if the ColGroups are as above?
// This should already be done inside the PlanningCoCoder, and therefore this information
// should be returned there, and not estimated twice.
// if(INVESTIGATE_ESTIMATES) {
// _stats.estimatedSizeColGroups = memoryEstimateIfColsAre(coCodeColGroups);
// }
// --------------------------------------------------
// --------------------------------------------------
// PHASE 3: Compress and correct sample-based decisions
ColGroup[] colGroups = ColGroupFactory
.compressColGroups(rawBlock, sizeInfos.compRatios, coCodeColGroups, compSettings, k);
// Make Compression happen!
CompressedMatrixBlock res = new CompressedMatrixBlock(mb);
List<ColGroup> colGroupList = ColGroupFactory.assignColumns(numCols, colGroups, rawBlock, compSettings);
res.allocateColGroupList(colGroupList);
_stats.setNextTimePhase(time.stop());
if(LOG.isDebugEnabled()) {
LOG.debug("--compression phase 3: " + _stats.getLastTimePhase());
}
// --------------------------------------------------
// --------------------------------------------------
// PHASE 4: Best-effort dictionary sharing for DDC1 single-col groups
Dictionary dict = (!(compSettings.validCompressions.contains(CompressionType.DDC)) ||
!(compSettings.allowSharedDDCDictionary)) ? null : createSharedDDC1Dictionary(colGroupList);
if(dict != null) {
applySharedDDC1Dictionary(colGroupList, dict);
res._sharedDDC1Dict = true;
}
_stats.setNextTimePhase(time.stop());
if(LOG.isDebugEnabled()) {
LOG.debug("--compression phase 4: " + _stats.getLastTimePhase());
}
// --------------------------------------------------
// --------------------------------------------------
// Phase 5: Cleanup
// The remaining columns are stored uncompressed as one big column group
_stats.size = res.estimateCompressedSizeInMemory();
_stats.originalSize = mb.estimateSizeInMemory();
_stats.ratio = _stats.originalSize / (double) _stats.size;
if(_stats.ratio < 1) {
LOG.warn("Abort block compression because compression ratio is less than 1.");
return new MatrixBlock().copyShallow(mb);
}
// Final cleanup (discard uncompressed block)
rawBlock.cleanupBlock(true, true);
res.cleanupBlock(true, true);
_stats.setNextTimePhase(time.stop());
_stats.setColGroupsCounts(colGroupList);
LOG.info("--num col groups: " + colGroupList.size() + ", -- num input cols: " + numCols);
LOG.debug("--compression phase 5: " + _stats.getLastTimePhase());
LOG.debug("--col groups types " + _stats.getGroupsTypesString());
LOG.debug("--col groups sizes " + _stats.getGroupsSizesString());
LOG.debug("--compressed size: " + _stats.size);
LOG.debug("--compression ratio: " + _stats.ratio);
// Set the statistics object.
// For better compression ratios this could be removed, since it is around 64 Bytes.
res._stats = _stats;
return res;
// --------------------------------------------------
}
/**
* Dictionary sharing between DDC ColGroups.
*
* @param colGroups The List of all ColGroups.
* @return the shared value list for the DDC ColGroups.
*/
private static Dictionary createSharedDDC1Dictionary(List<ColGroup> colGroups) {
// create joint dictionary
HashSet<Double> vals = new HashSet<>();
HashMap<Integer, Double> mins = new HashMap<>();
HashMap<Integer, Double> maxs = new HashMap<>();
int numDDC1 = 0;
for(final ColGroup grp : colGroups)
if(grp.getNumCols() == 1 && grp instanceof ColGroupDDC1) {
final ColGroupDDC1 grpDDC1 = (ColGroupDDC1) grp;
final double[] values = grpDDC1.getValues();
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
for(int i=0; i<values.length; i++) {
vals.add(values[i]);
min = Math.min(min, values[i]);
max = Math.max(max, values[i]);
}
mins.put(grpDDC1.getColIndex(0), min);
maxs.put(grpDDC1.getColIndex(0), max);
numDDC1++;
}
// abort shared dictionary creation if empty or too large
int maxSize = vals.contains(0d) ? 256 : 255;
if(numDDC1 < 2 || vals.size() > maxSize)
return null;
// build consolidated shared dictionary
double[] values = vals.stream().mapToDouble(Double::doubleValue).toArray();
int[] colIndexes = new int[numDDC1];
double[] extrema = new double[2*numDDC1];
int pos = 0;
for( Entry<Integer, Double> e : mins.entrySet() ) {
colIndexes[pos] = e.getKey();
extrema[2*pos] = e.getValue();
extrema[2*pos+1] = maxs.get(e.getKey());
pos ++;
}
return new DictionaryShared(values, colIndexes, extrema);
}
private static void applySharedDDC1Dictionary(List<ColGroup> colGroups, Dictionary dict) {
// create joint mapping table
HashMap<Double, Integer> map = new HashMap<>();
double[] values = dict.getValues();
for(int i = 0; i < values.length; i++)
map.put(values[i], i);
// recode data of all relevant DDC1 groups
for(ColGroup grp : colGroups)
if(grp.getNumCols() == 1 && grp instanceof ColGroupDDC1) {
ColGroupDDC1 grpDDC1 = (ColGroupDDC1) grp;
grpDDC1.recodeData(map);
grpDDC1.setDictionary(dict);
}
}
}