| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.sysds.runtime.compress.estim; |
| |
| import java.util.Arrays; |
| import java.util.Random; |
| |
| import org.apache.sysds.runtime.compress.CompressionSettings; |
| import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; |
| import org.apache.sysds.runtime.compress.estim.encoding.EncodingFactory; |
| import org.apache.sysds.runtime.compress.estim.encoding.IEncode; |
| import org.apache.sysds.runtime.compress.estim.sample.SampleEstimatorFactory; |
| import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing; |
| import org.apache.sysds.runtime.data.DenseBlock; |
| import org.apache.sysds.runtime.data.SparseBlock; |
| import org.apache.sysds.runtime.data.SparseBlockMCSR; |
| import org.apache.sysds.runtime.data.SparseRow; |
| import org.apache.sysds.runtime.matrix.data.LibMatrixReorg; |
| import org.apache.sysds.runtime.matrix.data.MatrixBlock; |
| |
| /** |
| * Estimate compression size based on subsample of data. |
| */ |
| public class ComEstSample extends AComEst { |
| |
| /** Sample extracted from the input data */ |
| private final MatrixBlock _sample; |
| /** Parallelization degree */ |
| private final int _k; |
| /** Sample size */ |
| private final int _sampleSize; |
| /** Boolean specifying if the sample is in transposed format. */ |
| private boolean _transposed; |
| |
| /** |
| * CompressedSizeEstimatorSample, samples from the input data and estimates the size of the compressed matrix. |
| * |
| * @param data The input data toSample from |
| * @param cs The Settings used for the sampling, and compression, contains information such as seed. |
| * @param sampleSize The size to sample from the data. |
| * @param k The parallelization degree allowed. |
| */ |
| public ComEstSample(MatrixBlock data, CompressionSettings cs, int sampleSize, int k) { |
| super(data, cs); |
| _k = k; |
| _sampleSize = sampleSize; |
| _transposed = _cs.transposed; |
| if(LOG.isDebugEnabled()) { |
| Timing time = new Timing(true); |
| _sample = sampleData(sampleSize); |
| LOG.debug("Sampling time: " + time.stop()); |
| } |
| else |
| _sample = sampleData(sampleSize); |
| |
| } |
| |
| @Override |
| public CompressedSizeInfoColGroup getColGroupInfo(IColIndex colIndexes, int estimate, int maxDistinct) { |
| if(_data.isEmpty() || (nnzCols != null && colIndexes.size() == 1 && nnzCols[colIndexes.get(0)] == 0) || |
| (_cs.transposed && colIndexes.size() == 1 && _data.isInSparseFormat() && |
| _data.getSparseBlock().isEmpty(colIndexes.get(0)))) |
| return new CompressedSizeInfoColGroup(colIndexes, getNumRows()); |
| |
| final IEncode map = EncodingFactory.createFromMatrixBlock(_sample, _transposed, colIndexes); |
| return extractInfo(map, colIndexes, maxDistinct); |
| } |
| |
| @Override |
| public CompressedSizeInfoColGroup getDeltaColGroupInfo(IColIndex colIndexes, int estimate, int maxDistinct) { |
| // Don't use sample when doing estimation of delta encoding, instead we read from the start of the matrix until |
| // sample size. This guarantees that the delta values are actually represented in the full compression |
| final IEncode map = EncodingFactory.createFromMatrixBlockDelta(_data, _transposed, colIndexes, _sampleSize); |
| return extractInfo(map, colIndexes, maxDistinct); |
| } |
| |
| @Override |
| protected int worstCaseUpperBound(IColIndex columns) { |
| if(getNumColumns() == columns.size()) |
| return Math.min(getNumRows(), (int) _data.getNonZeros()); |
| return getNumRows(); |
| } |
| |
| @Override |
| protected CompressedSizeInfoColGroup combine(IColIndex combinedColumns, CompressedSizeInfoColGroup g1, |
| CompressedSizeInfoColGroup g2, int maxDistinct) { |
| final IEncode map = g1.getMap().combine(g2.getMap()); |
| return extractInfo(map, combinedColumns, maxDistinct); |
| } |
| |
| private CompressedSizeInfoColGroup extractInfo(IEncode map, IColIndex colIndexes, int maxDistinct) { |
| final double spar = _data.getSparsity(); |
| final EstimationFactors sampleFacts = map.extractFacts(_sampleSize, spar, spar, _cs); |
| final EstimationFactors em = scaleFactors(sampleFacts, colIndexes, maxDistinct, map.isDense()); |
| return new CompressedSizeInfoColGroup(colIndexes, em, _cs.validCompressions, map); |
| } |
| |
| private EstimationFactors scaleFactors(EstimationFactors sampleFacts, IColIndex colIndexes, int maxDistinct, |
| boolean dense) { |
| final int numRows = getNumRows(); |
| final int nCol = colIndexes.size(); |
| |
| final double scalingFactor = (double) numRows / _sampleSize; |
| |
| final long nnz = calculateNNZ(colIndexes, scalingFactor); |
| final int numOffs = calculateOffs(sampleFacts, numRows, scalingFactor, colIndexes, (int) nnz); |
| final int estDistinct = distinctCountScale(sampleFacts, numOffs, numRows, maxDistinct, dense, nCol); |
| |
| // calculate the largest instance count. |
| final int maxLargestInstanceCount = numRows - estDistinct + 1; |
| final int scaledLargestInstanceCount = sampleFacts.largestOff < 0 ? numOffs / |
| estDistinct : (int) Math.floor(sampleFacts.largestOff * scalingFactor); |
| final int mostFrequentOffsetCount = Math.max(Math.min(maxLargestInstanceCount, scaledLargestInstanceCount), |
| numRows - numOffs); |
| |
| final double overallSparsity = calculateSparsity(colIndexes, nnz, scalingFactor, sampleFacts.overAllSparsity); |
| // For robustness safety add 10 percent more tuple sparsity |
| final double tupleSparsity = Math.min(overallSparsity * 1.3, 1.0); // increase sparsity by 30%. |
| |
| if(_cs.isRLEAllowed()) { |
| final int scaledRuns = Math.max(estDistinct, calculateRuns(sampleFacts, scalingFactor, numOffs, estDistinct)); |
| return new EstimationFactors(estDistinct, numOffs, mostFrequentOffsetCount, sampleFacts.frequencies, |
| sampleFacts.numSingle, numRows, scaledRuns, sampleFacts.lossy, sampleFacts.zeroIsMostFrequent, |
| overallSparsity, tupleSparsity); |
| } |
| else |
| return new EstimationFactors(estDistinct, numOffs, mostFrequentOffsetCount, sampleFacts.frequencies, |
| sampleFacts.numSingle, numRows, sampleFacts.lossy, sampleFacts.zeroIsMostFrequent, overallSparsity, |
| tupleSparsity); |
| |
| } |
| |
| private int distinctCountScale(EstimationFactors sampleFacts, int numOffs, int numRows, int maxDistinct, |
| boolean dense, int nCol) { |
| // the frequencies of non empty entries. |
| final int[] freq = sampleFacts.frequencies; |
| if(freq == null || freq.length == 0) |
| return numOffs; // very aggressive number of distinct |
| // sampled size is smaller than actual if there was empty rows. |
| // and the more we can reduce this value the more accurate the estimation will become. |
| final int sampledSize = sampleFacts.numOffs; |
| int est = SampleEstimatorFactory.distinctCount(freq, dense ? numRows : numOffs, sampledSize, _cs.estimationType); |
| if(est > 10000) |
| est += est * 0.5; |
| if(nCol > 4) // Increase estimate if we get into many columns cocoding to be safe |
| est += ((double) est) * ((double) nCol) / 10; |
| // Bound the estimate with the maxDistinct. |
| return Math.max(Math.min(est, Math.min(maxDistinct, numOffs)), 1); |
| } |
| |
| private int calculateOffs(EstimationFactors sampleFacts, int numRows, double scalingFactor, IColIndex colIndexes, |
| int nnz) { |
| |
| if(getNumColumns() == 1) |
| return nnz; |
| else if(nnzCols != null) { |
| if(colIndexes.size() == 1) |
| return nnzCols[colIndexes.get(0)]; |
| else { |
| final int emptyTuples = sampleFacts.numRows - sampleFacts.numOffs; |
| final int estOffs = numRows - (int) Math.floor(emptyTuples * scalingFactor); |
| return Math.min(nnz, estOffs); |
| } |
| } |
| else { |
| final int emptyTuples = sampleFacts.numRows - sampleFacts.numOffs; |
| return numRows - (int) Math.floor(emptyTuples * scalingFactor); |
| } |
| } |
| |
| private int calculateRuns(EstimationFactors sampleFacts, double scalingFactor, int estOffs, int estDistinct) { |
| // naive approach. |
| final double nRunsInSample = sampleFacts.numRuns; |
| double numRuns = 0; |
| |
| // process frequency maps. |
| // if(sampleFacts.frequencies != null) { |
| // for(int freq : sampleFacts.frequencies) { |
| // double dFreq = freq; |
| // double offsetRatio = dFreq / _sampleSize; |
| // double avgOffsetsThisValue = dFreq * scalingFactor; |
| // if(offsetRatio < 1) { |
| // // Assuming uniform distribution and this value is very rare. |
| // // Assume worst case of all unique runs for this value. |
| // numRuns += avgOffsetsThisValue; |
| // } |
| // else { |
| // // In the case where we know guaranteed runs because ratio is above 1 we slack the conditions. |
| // // saying we have runs based on the ratio of a specific offset is present. |
| // numRuns += dFreq / offsetRatio * scalingFactor; |
| // } |
| // } |
| // } |
| |
| // numRuns = Math.max(numRuns, nRunsInSample * scalingFactor); // minimum estimate sample runs scaling up. |
| double sampleToRunRatio = nRunsInSample / sampleFacts.numVals; |
| double sampleSizeToRunRatio = nRunsInSample / _sampleSize; |
| numRuns = (sampleToRunRatio <= 1.1 && sampleSizeToRunRatio < 0.5) ? // decide estimation model |
| sampleToRunRatio * estDistinct : // run per value in sample scale to larger |
| nRunsInSample * scalingFactor; // simply scale num runs |
| |
| // With an estimated num runs, we now bound it. |
| numRuns = Math.min(numRuns, estOffs); // max number of runs equal to estimated offsets |
| numRuns = Math.max(numRuns, estDistinct); // minimum number of distinct |
| numRuns = Math.min(Integer.MAX_VALUE, Math.ceil(numRuns)); |
| |
| return (int) numRuns; |
| } |
| |
| private double calculateSparsity(IColIndex colIndexes, long nnz, double scalingFactor, double sampleValue) { |
| if(colIndexes.size() == getNumColumns()) |
| return _data.getSparsity(); |
| else if(nnzCols != null || (_cs.transposed && _data.isInSparseFormat()) || |
| (_transposed && _sample.isInSparseFormat())) |
| return (double) nnz / (getNumRows() * colIndexes.size()); |
| else if(_sample.isEmpty()) |
| // Make a semi safe bet of using the data input sparsity if the sample was empty. |
| return _data.getSparsity(); |
| else |
| return sampleValue; |
| } |
| |
| private long calculateNNZ(IColIndex colIndexes, double scalingFactor) { |
| if(colIndexes.size() == getNumColumns()) |
| return _data.getNonZeros(); |
| else if(_cs.transposed && _data.isInSparseFormat()) { |
| // Use exact if possible |
| long nnzCount = 0; |
| SparseBlock sb = _data.getSparseBlock(); |
| for(int i = 0; i < colIndexes.size(); i++) |
| nnzCount += sb.get(i).size(); |
| return nnzCount; |
| } |
| else if(nnzCols != null) { |
| long nnz = 0; |
| for(int i = 0; i < colIndexes.size(); i++) |
| nnz += nnzCols[colIndexes.get(i)]; |
| return nnz; |
| } |
| else if(_sample.isEmpty()) |
| return 0; |
| else if(_transposed && _sample.isInSparseFormat()) { |
| // Fallback to the sample if original is not transposed |
| long nnzCount = 0; |
| SparseBlock sb = _sample.getSparseBlock(); |
| for(int i = 0; i < colIndexes.size(); i++) |
| if(!sb.isEmpty(i)) |
| nnzCount += sb.get(i).size() * scalingFactor; |
| |
| // add one to make sure that Uncompressed columns are considered as containing at least one value. |
| if(nnzCount == 0) |
| nnzCount += colIndexes.size(); |
| return nnzCount; |
| } |
| else |
| // if all others aren't available use the samples value. |
| return _sample.getNonZeros(); |
| } |
| |
| public static int[] getSortedSample(int range, int sampleSize, long seed, int k) { |
| // set meta data and allocate dense block |
| final int[] a = new int[sampleSize]; |
| |
| Random r = new Random(seed); |
| // reservoir sampling |
| for(int i = 0; i < sampleSize; i++) |
| a[i] = i; |
| |
| for(int i = sampleSize; i < range; i++) |
| if(r.nextInt(i) < sampleSize) |
| a[r.nextInt(sampleSize)] = i; |
| |
| if(range / 100 < sampleSize) { |
| // randomize the sample (Algorithm P from Knuth's ACP) |
| // needed especially when the difference between range and sampleSize is small) |
| for(int i = 0; i < sampleSize - 1; i++) { |
| // generate index in i <= j < sampleSize |
| int j = r.nextInt(sampleSize - i) + i; |
| // swap i^th and j^th entry |
| int tmp = a[i]; |
| a[i] = a[j]; |
| a[j] = tmp; |
| } |
| } |
| |
| // Sort the sample |
| if(k > 1) |
| Arrays.parallelSort(a); |
| else |
| Arrays.sort(a); |
| return a; |
| } |
| |
| private MatrixBlock sampleData(int sampleSize) { |
| |
| final int[] sampleRows = ComEstSample.getSortedSample(getNumRows(), sampleSize, _cs.seed, _k); |
| MatrixBlock sampledMatrixBlock; |
| if(!_cs.transposed) { |
| if(_data.isInSparseFormat()) |
| sampledMatrixBlock = sparseNotTransposedSamplePath(sampleRows); |
| else |
| sampledMatrixBlock = denseSamplePath(sampleRows); |
| } |
| else |
| sampledMatrixBlock = defaultSlowSamplingPath(sampleRows); |
| |
| return sampledMatrixBlock; |
| } |
| |
| private MatrixBlock sparseNotTransposedSamplePath(int[] sampleRows) { |
| MatrixBlock res = new MatrixBlock(sampleRows.length, _data.getNumColumns(), true); |
| SparseRow[] rows = new SparseRow[sampleRows.length]; |
| SparseBlock in = _data.getSparseBlock(); |
| for(int i = 0; i < sampleRows.length; i++) |
| rows[i] = in.get(sampleRows[i]); |
| |
| res.setSparseBlock(new SparseBlockMCSR(rows, false)); |
| res.recomputeNonZeros(); |
| _transposed = true; |
| res = LibMatrixReorg.transposeInPlace(res, _k); |
| |
| return res; |
| } |
| |
| private MatrixBlock defaultSlowSamplingPath(int[] sampleRows) { |
| MatrixBlock select = (_cs.transposed) ? new MatrixBlock(_data.getNumColumns(), 1, |
| false) : new MatrixBlock(_data.getNumRows(), 1, false); |
| for(int i = 0; i < sampleRows.length; i++) |
| select.appendValue(sampleRows[i], 0, 1); |
| MatrixBlock ret = _data.removeEmptyOperations(new MatrixBlock(), !_cs.transposed, true, select); |
| return ret; |
| } |
| |
| private MatrixBlock denseSamplePath(int[] sampleRows) { |
| final int sampleSize = sampleRows.length; |
| final double sampleRatio = _cs.transposed ? (double) _data.getNumColumns() / |
| sampleSize : (double) _data.getNumRows() / sampleSize; |
| final long inputNonZeros = _data.getNonZeros(); |
| final long estimatedNonZerosInSample = (long) Math.ceil((double) inputNonZeros / sampleRatio); |
| final int resRows = _cs.transposed ? _data.getNumRows() : _data.getNumColumns(); |
| final long nCellsInSample = (long) sampleSize * resRows; |
| final boolean shouldBeSparseSample = 0.4 > (double) estimatedNonZerosInSample / nCellsInSample; |
| MatrixBlock res = new MatrixBlock(resRows, sampleSize, shouldBeSparseSample); |
| res.allocateBlock(); |
| |
| final DenseBlock inb = _data.getDenseBlock(); |
| if(res.isInSparseFormat()) { |
| final SparseBlock resb = res.getSparseBlock(); |
| final SparseBlockMCSR resbmcsr = (SparseBlockMCSR) resb; |
| final int estimatedNrDoublesEachRow = (int) Math.max(4, Math.ceil(estimatedNonZerosInSample / sampleSize)); |
| for(int col = 0; col < resRows; col++) |
| resbmcsr.allocate(col, estimatedNrDoublesEachRow); |
| |
| for(int row = 0; row < sampleSize; row++) { |
| final int inRow = sampleRows[row]; |
| final double[] inBlockV = inb.values(inRow); |
| final int offIn = inb.pos(inRow); |
| for(int col = 0; col < resRows; col++) { |
| final SparseRow srow = resbmcsr.get(col); |
| srow.append(row, inBlockV[offIn + col]); |
| } |
| } |
| } |
| else { |
| final DenseBlock resb = res.getDenseBlock(); |
| for(int row = 0; row < sampleSize; row++) { |
| final int inRow = sampleRows[row]; |
| final double[] inBlockV = inb.values(inRow); |
| final int offIn = inb.pos(inRow); |
| for(int col = 0; col < resRows; col++) { |
| final double[] blockV = resb.values(col); |
| blockV[col * sampleSize + row] = inBlockV[offIn + col]; |
| } |
| } |
| } |
| res.setNonZeros(estimatedNonZerosInSample); |
| _transposed = true; |
| |
| return res; |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder(); |
| sb.append(super.toString()); |
| sb.append(" sampleSize: "); |
| sb.append(_sampleSize); |
| sb.append(" transposed: "); |
| sb.append(_transposed); |
| return sb.toString(); |
| } |
| } |