blob: 2deb2bbd28fea203780b3798f9ca841caeda9f17 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.sysds.runtime.compress.colgroup;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.DMLCompressionException;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.BitmapEncoder;
import org.apache.sysds.runtime.compress.CompressionSettings;
import org.apache.sysds.runtime.compress.cocode.PlanningCoCoder.PartitionerType;
import org.apache.sysds.runtime.compress.colgroup.ColGroup.CompressionType;
import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimatorExact;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
import org.apache.sysds.runtime.compress.utils.ABitmap;
import org.apache.sysds.runtime.util.CommonThreadPool;
* Factory pattern for constructing ColGroups.
public class ColGroupFactory {
private static final Log LOG = LogFactory.getLog(ColGroupFactory.class.getName());
* The actual compression method, that handles the logic of compressing multiple columns together. This method also
* have the responsibility of correcting any estimation errors previously made.
* @param in The input matrix, that could have been transposed if CompSettings was set to do that
* @param compRatios The previously computed Compression ratings of individual col indexes.
* @param groups The column groups to consider compressing together.
* @param compSettings The compression settings to construct the compression based on.
* @param k The degree of parallelism used.
* @return A Resulting array of ColGroups, containing the compressed information from the input matrix block.
public static ColGroup[] compressColGroups(MatrixBlock in, HashMap<Integer, Double> compRatios, List<int[]> groups,
CompressionSettings compSettings, int k) {
if(k <= 1) {
return compressColGroups(in, compRatios, groups, compSettings);
else {
try {
ExecutorService pool = CommonThreadPool.get(k);
ArrayList<CompressTask> tasks = new ArrayList<>();
for(int[] colIndexes : groups)
tasks.add(new CompressTask(in, compRatios, colIndexes, compSettings));
List<Future<ColGroup>> rtask = pool.invokeAll(tasks);
ArrayList<ColGroup> ret = new ArrayList<>();
for(Future<ColGroup> lrtask : rtask)
return ret.toArray(new ColGroup[groups.size()]);
catch(InterruptedException | ExecutionException e) {
// If there is an error in the parallel execution default to the non parallel implementation
return compressColGroups(in, compRatios, groups, compSettings);
private static ColGroup[] compressColGroups(MatrixBlock in, HashMap<Integer, Double> compRatios, List<int[]> groups,
CompressionSettings compSettings) {
ColGroup[] ret = new ColGroup[groups.size()];
for(int i = 0; i < groups.size(); i++)
ret[i] = compressColGroup(in, compRatios, groups.get(i), compSettings);
return ret;
private static class CompressedColumn implements Comparable<CompressedColumn> {
final int colIx;
final double compRatio;
public CompressedColumn(int colIx, double compRatio) {
this.colIx = colIx;
this.compRatio = compRatio;
public static PriorityQueue<CompressedColumn> makePriorityQue(HashMap<Integer, Double> compRatios,
int[] colIndexes) {
PriorityQueue<CompressedColumn> compRatioPQ;
// first modification
compRatioPQ = new PriorityQueue<>();
for(int i = 0; i < colIndexes.length; i++)
compRatioPQ.add(new CompressedColumn(i, compRatios.get(colIndexes[i])));
return compRatioPQ;
public int compareTo(CompressedColumn o) {
return (int) Math.signum(compRatio - o.compRatio);
private static class CompressTask implements Callable<ColGroup> {
private final MatrixBlock _in;
private final HashMap<Integer, Double> _compRatios;
private final int[] _colIndexes;
private final CompressionSettings _compSettings;
protected CompressTask(MatrixBlock in, HashMap<Integer, Double> compRatios, int[] colIndexes,
CompressionSettings compSettings) {
_in = in;
_compRatios = compRatios;
_colIndexes = colIndexes;
_compSettings = compSettings;
public ColGroup call() {
return compressColGroup(_in, _compRatios, _colIndexes, _compSettings);
private static ColGroup compressColGroup(MatrixBlock in, HashMap<Integer, Double> compRatios, int[] colIndexes,
CompressionSettings compSettings) {
int[] allGroupIndices = colIndexes.clone();
CompressedSizeInfoColGroup sizeInfo;
// The compression type is decided based on a full bitmap since it
// will be reused for the actual compression step.
ABitmap ubm = null;
PriorityQueue<CompressedColumn> compRatioPQ = CompressedColumn.makePriorityQue(compRatios, colIndexes);
// Switching to exact estimator here, when doing the actual compression.
CompressedSizeEstimator estimator = new CompressedSizeEstimatorExact(in, compSettings, compSettings.transposed);
while(true) {
// STEP 1.
// Extract the entire input column list and observe compression ratio
ubm = BitmapEncoder.extractBitmap(colIndexes, in, compSettings.transposed);
sizeInfo = new CompressedSizeInfoColGroup(estimator.estimateCompressedColGroupSize(ubm),
// Throw error if for some reason the compression observed is 0.
if(sizeInfo.getMinSize() == 0) {
throw new DMLRuntimeException("Size info of compressed Col Group is 0");
// STEP 2.
// Calculate the compression ratio compared to an uncompressed ColGroup type.
double compRatio = sizeInfo.getCompressionSize(CompressionType.UNCOMPRESSED) / sizeInfo.getMinSize();
// STEP 3.
// Finish the search and close this compression if the group show good compression.
// Seems a little early to stop here. Maybe reconsider how to decide when to stop.
// Also when comparing to the case of 1.0 compression ratio, it could be that we chose to compress a group
// worse than the individual columns.
// Furthermore performance of a compressed representation that does not compress much, is decremental to
// overall performance.
if(compRatio > 1.0 || compSettings.columnPartitioner == PartitionerType.COST) {
int rlen = compSettings.transposed ? in.getNumColumns() : in.getNumRows();
return compress(colIndexes, rlen, ubm, sizeInfo.getBestCompressionType(), compSettings, in);
else {
// STEP 4.
// Try to remove the least compressible column from the columns to compress.
// Then repeat from Step 1.
allGroupIndices[compRatioPQ.poll().colIx] = -1;
if(colIndexes.length - 1 == 0) {
return null;
colIndexes = new int[colIndexes.length - 1];
// copying the values that do not equal -1
int ix = 0;
for(int col : allGroupIndices)
if(col != -1)
colIndexes[ix++] = col;
* Method for compressing an ColGroup.
* @param colIndexes The Column indexes to compress
* @param rlen The number of rows in the columns
* @param ubm The Bitmap containing all the data needed for the compression (unless Uncompressed
* ColGroup)
* @param compType The CompressionType selected
* @param cs The compression Settings used for the given compression
* @param rawMatrixBlock The copy of the original input (maybe transposed) MatrixBlock
* @return A Compressed ColGroup
public static ColGroup compress(int[] colIndexes, int rlen, ABitmap ubm, CompressionType compType,
CompressionSettings cs, MatrixBlock rawMatrixBlock) {
switch(compType) {
case DDC:
if(ubm.getNumValues() < 256) {
return new ColGroupDDC1(colIndexes, rlen, ubm, cs);
else {
return new ColGroupDDC2(colIndexes, rlen, ubm, cs);
case RLE:
return new ColGroupRLE(colIndexes, rlen, ubm, cs);
case OLE:
return new ColGroupOLE(colIndexes, rlen, ubm, cs);
return new ColGroupUncompressed(colIndexes, rawMatrixBlock, cs.transposed);
throw new DMLCompressionException("Not implemented ColGroup Type compressed in factory.");
* Method for producing the final ColGroupList stored inside the CompressedMatrixBlock.
* TODO Redesign this method such that it does not utilize the null pointers to decide on which ColGroups should be
* incompressable. This is done by changing both this method and compressColGroup inside this class.
* @param numCols The number of columns in input matrix
* @param colGroups The colgroups made to assign
* @param rawBlock The (maybe transposed) original MatrixBlock
* @param compSettings The Compressionsettings used.
* @return return the final ColGroupList.
public static List<ColGroup> assignColumns(int numCols, ColGroup[] colGroups, MatrixBlock rawBlock,
CompressionSettings compSettings) {
List<ColGroup> _colGroups = new ArrayList<>();
HashSet<Integer> remainingCols = seq(0, numCols - 1, 1);
for(int j = 0; j < colGroups.length; j++) {
if(colGroups[j] != null) {
for(int col : colGroups[j].getColIndices())
if(!remainingCols.isEmpty()) {
int[] list = -> i).toArray();
LOG.warn("Uncompressable Columns: " + Arrays.toString(list));
ColGroupUncompressed ucgroup = new ColGroupUncompressed(list, rawBlock, compSettings.transposed);
return _colGroups;
private static HashSet<Integer> seq(int from, int to, int incr) {
HashSet<Integer> ret = new HashSet<>();
for(int i = from; i <= to; i += incr)
return ret;