blob: 509b3407e12f3a1c4c72fd09bd64b8d7610d4503 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sysds.runtime.compress.estim;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.compress.CompressionSettings;
import org.apache.sysds.runtime.compress.colgroup.ColGroup.CompressionType;
import org.apache.sysds.runtime.compress.colgroup.ColGroupSizes;
import org.apache.sysds.runtime.compress.utils.AbstractBitmap;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.util.CommonThreadPool;
/**
* Main abstract class for estimating size of compressions on columns.
*/
public abstract class CompressedSizeEstimator {
protected static final Log LOG = LogFactory.getLog(CompressedSizeEstimator.class.getName());
/** The Matrix Block to extract the compression estimates from */
protected MatrixBlock _data;
/** The number of rows in the matrix block, extracted to a field because the matrix could be transposed */
protected final int _numRows;
/** The number of columns in the matrix block, extracted to a field because the matrix could be transposed */
protected final int _numCols;
/** The compression settings to use, for estimating the size, and compress the ColGroups. */
protected final CompressionSettings _compSettings;
/**
* Main Constructor for Compression Estimator.
*
* protected because the factory should be used to construct the CompressedSizeEstimator
*
* @param data The matrix block to extract information from
* @param compSettings The Compression settings used.
*/
protected CompressedSizeEstimator(MatrixBlock data, CompressionSettings compSettings) {
_data = data;
_numRows = compSettings.transposeInput ? _data.getNumColumns() : _data.getNumRows();
_numCols = compSettings.transposeInput ? _data.getNumRows() : _data.getNumColumns();
_compSettings = compSettings;
}
/**
* Multi threaded version of extracting Compression Size info
*
* @param k The concurrency degree.
* @return The Compression Size info of each Column compressed isolated.
*/
public CompressedSizeInfo computeCompressedSizeInfos(int k) {
CompressedSizeInfoColGroup[] sizeInfos = estimateIndividualColumnGroupSizes(k);
return computeCompressedSizeInfos(sizeInfos);
}
/**
* Extracts the CompressedSizeInfo for a list of ColGroups. The Compression Ratio is based on a Dense Uncompressed
* Double Vector for each of the columns.
*
* Internally it Loops through all the columns, and selects the best compression colGroup for that column. Even if
* that is an UncompressedColGroup.
*
* @param sizeInfos The size information of each of the Column Groups.
* @return A CompressedSizeInfo object containing the information of the best column groups for individual columns.
*/
private CompressedSizeInfo computeCompressedSizeInfos(CompressedSizeInfoColGroup[] sizeInfos) {
List<Integer> colsC = new ArrayList<>();
List<Integer> colsUC = new ArrayList<>();
HashMap<Integer, Double> compRatios = new HashMap<>();
// The size of an Uncompressed Dense ColGroup In the Column.
double unCompressedDenseSize = ColGroupSizes.estimateInMemorySizeUncompressed(_numCols, _numRows, 1.0);
int nnzUCSum = 0;
for(int col = 0; col < _numCols; col++) {
double minCompressedSize = (double) sizeInfos[col].getMinSize();
double compRatio = unCompressedDenseSize / minCompressedSize;
compRatios.put(col, compRatio);
// If the best compression is achieved in an UnCompressed colGroup it is usually because it is a sparse
// ColGroup
if(sizeInfos[col].getBestCompressionType() == CompressionType.UNCOMPRESSED) {
colsUC.add(col);
nnzUCSum += sizeInfos[col].getEstNnz();
}
else {
colsC.add(col);
compRatios.put(col, compRatio);
}
}
return new CompressedSizeInfo(sizeInfos, colsC, colsUC, compRatios, nnzUCSum);
}
private CompressedSizeInfoColGroup[] estimateIndividualColumnGroupSizes(int k) {
return (k > 1) ? CompressedSizeInfoColGroup(_numCols, k) : CompressedSizeInfoColGroup(_numCols);
}
/**
* Method used for compressing into one type of colGroup
*
* @return CompressedSizeInfo on a compressed colGroup compressing the entire matrix into a single colGroup type.
*/
public CompressedSizeInfoColGroup estimateCompressedColGroupSize() {
int[] colIndexes = makeColIndexes();
return estimateCompressedColGroupSize(colIndexes);
}
/**
* Abstract method for extracting Compressed Size Info of specified columns, together in a single ColGroup
*
* @param colIndexes The Colums to group together inside a ColGroup
* @return The CompressedSizeInformation associated with the selected ColGroups.
*/
public abstract CompressedSizeInfoColGroup estimateCompressedColGroupSize(int[] colIndexes);
/**
* Method used to extract the CompressedSizeEstimationFactors from an constructed UncompressedBitmap. Note this
* method works both for the sample based estimator and the exact estimator, since the bitmap, can be extracted from
* a sample or from the entire dataset.
*
* @param ubm the UncompressedBitmap, either extracted from a sample or from the entier dataset
* @return The size factors estimated from the Bit Map.
*/
public EstimationFactors estimateCompressedColGroupSize(AbstractBitmap ubm) {
return EstimationFactors.computeSizeEstimationFactors(ubm,
_compSettings.validCompressions.contains(CompressionType.RLE),
_numRows,
ubm.getNumColumns());
}
// ------------------------------------------------
// PARALLEL CODE
// ------------------------------------------------
private CompressedSizeInfoColGroup[] CompressedSizeInfoColGroup(int clen) {
CompressedSizeInfoColGroup[] ret = new CompressedSizeInfoColGroup[clen];
for(int col = 0; col < clen; col++)
ret[col] = estimateCompressedColGroupSize(new int[] {col});
return ret;
}
private CompressedSizeInfoColGroup[] CompressedSizeInfoColGroup(int clen, int k) {
try {
ExecutorService pool = CommonThreadPool.get(k);
ArrayList<SizeEstimationTask> tasks = new ArrayList<>();
for(int col = 0; col < clen; col++)
tasks.add(new SizeEstimationTask(this, col));
List<Future<CompressedSizeInfoColGroup>> rtask = pool.invokeAll(tasks);
ArrayList<CompressedSizeInfoColGroup> ret = new ArrayList<>();
for(Future<CompressedSizeInfoColGroup> lrtask : rtask)
ret.add(lrtask.get());
pool.shutdown();
return ret.toArray(new CompressedSizeInfoColGroup[0]);
}
catch(InterruptedException | ExecutionException e) {
return CompressedSizeInfoColGroup(clen);
}
}
private static class SizeEstimationTask implements Callable<CompressedSizeInfoColGroup> {
private final CompressedSizeEstimator _estimator;
private final int _col;
protected SizeEstimationTask(CompressedSizeEstimator estimator, int col) {
_estimator = estimator;
_col = col;
}
@Override
public CompressedSizeInfoColGroup call() {
return _estimator.estimateCompressedColGroupSize(new int[] {_col});
}
}
// ------------------------------------------------
// PARALLEL CODE END
// ------------------------------------------------
// UTIL
private int[] makeColIndexes() {
int[] colIndexes = new int[_numCols];
for(int i = 0; i < _numCols; i++) {
colIndexes[i] = i;
}
return colIndexes;
}
}