| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.sysds.runtime.compress.colgroup; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| |
| import org.apache.commons.lang3.NotImplementedException; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.sysds.runtime.compress.CompressedMatrixBlock; |
| import org.apache.sysds.runtime.compress.CompressionSettings; |
| import org.apache.sysds.runtime.compress.DMLCompressionException; |
| import org.apache.sysds.runtime.compress.bitmap.ABitmap; |
| import org.apache.sysds.runtime.compress.bitmap.BitmapEncoder; |
| import org.apache.sysds.runtime.compress.colgroup.AColGroup.CompressionType; |
| import org.apache.sysds.runtime.compress.colgroup.dictionary.ADictionary; |
| import org.apache.sysds.runtime.compress.colgroup.dictionary.Dictionary; |
| import org.apache.sysds.runtime.compress.colgroup.dictionary.DictionaryFactory; |
| import org.apache.sysds.runtime.compress.colgroup.functional.LinearRegression; |
| import org.apache.sysds.runtime.compress.colgroup.indexes.ColIndexFactory; |
| import org.apache.sysds.runtime.compress.colgroup.indexes.IColIndex; |
| import org.apache.sysds.runtime.compress.colgroup.indexes.IIterate; |
| import org.apache.sysds.runtime.compress.colgroup.insertionsort.AInsertionSorter; |
| import org.apache.sysds.runtime.compress.colgroup.insertionsort.InsertionSorterFactory; |
| import org.apache.sysds.runtime.compress.colgroup.mapping.AMapToData; |
| import org.apache.sysds.runtime.compress.colgroup.mapping.MapToFactory; |
| import org.apache.sysds.runtime.compress.colgroup.offset.AOffset; |
| import org.apache.sysds.runtime.compress.colgroup.offset.OffsetFactory; |
| import org.apache.sysds.runtime.compress.cost.ACostEstimate; |
| import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo; |
| import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup; |
| import org.apache.sysds.runtime.compress.lib.CLALibCombineGroups; |
| import org.apache.sysds.runtime.compress.readers.ReaderColumnSelection; |
| import org.apache.sysds.runtime.compress.utils.ACount.DCounts; |
| import org.apache.sysds.runtime.compress.utils.DblArray; |
| import org.apache.sysds.runtime.compress.utils.DblArrayCountHashMap; |
| import org.apache.sysds.runtime.compress.utils.DoubleCountHashMap; |
| import org.apache.sysds.runtime.compress.utils.IntArrayList; |
| import org.apache.sysds.runtime.controlprogram.parfor.stat.Timing; |
| import org.apache.sysds.runtime.data.DenseBlock; |
| import org.apache.sysds.runtime.data.SparseBlock; |
| import org.apache.sysds.runtime.matrix.data.MatrixBlock; |
| import org.apache.sysds.runtime.util.CommonThreadPool; |
| |
| /** |
| * Factory class for constructing ColGroups. |
| */ |
| public class ColGroupFactory { |
| protected static final Log LOG = LogFactory.getLog(ColGroupFactory.class.getName()); |
| |
| /** Input matrix to compress */ |
| private final MatrixBlock in; |
| /** Compression information to compress based on */ |
| private final CompressedSizeInfo csi; |
| /** Compression settings specifying for instance if the input is transposed */ |
| private final CompressionSettings cs; |
| /** The cost estimator to use to calculate cost of compression */ |
| private final ACostEstimate ce; |
| /** Parallelization degree */ |
| private final int k; |
| /** number of rows in input (taking into account if the input is transposed) */ |
| private final int nRow; |
| /** number of columns in input (taking into account if the input is transposed) */ |
| private final int nCol; |
| /** Thread pool to use in execution of compression */ |
| private final ExecutorService pool; |
| |
| private ColGroupFactory(MatrixBlock in, CompressedSizeInfo csi, CompressionSettings cs, ACostEstimate ce, int k) { |
| this.in = in; |
| this.csi = csi; |
| this.cs = cs; |
| this.k = k; |
| this.ce = ce; |
| |
| this.nRow = cs.transposed ? in.getNumColumns() : in.getNumRows(); |
| this.nCol = cs.transposed ? in.getNumRows() : in.getNumColumns(); |
| |
| this.pool = (k > 1) ? CommonThreadPool.get(k) : null; |
| } |
| |
| /** |
| * The actual compression method, that handles the logic of compressing multiple columns together. |
| * |
| * @param in The input matrix, that could have been transposed. If it is transposed the compSettings should specify |
| * this. |
| * @param csi The compression information extracted from the estimation, this contains which groups of columns to |
| * compress together. |
| * @param cs The compression settings to specify how to compress. |
| * @return A resulting array of ColGroups, containing the compressed information from the input matrix block. |
| */ |
| public static List<AColGroup> compressColGroups(MatrixBlock in, CompressedSizeInfo csi, CompressionSettings cs) { |
| return compressColGroups(in, csi, cs, 1); |
| } |
| |
| /** |
| * The actual compression method, that handles the logic of compressing multiple columns together. |
| * |
| * @param in The input matrix, that could have been transposed. If it is transposed the compSettings should specify |
| * this. |
| * @param csi The compression information extracted from the estimation, this contains which groups of columns to |
| * compress together. |
| * @param cs The compression settings to specify how to compress. |
| * @param k The degree of parallelism to be used in the compression of the column groups. |
| * @return A resulting array of ColGroups, containing the compressed information from the input matrix block. |
| */ |
| public static List<AColGroup> compressColGroups(MatrixBlock in, CompressedSizeInfo csi, CompressionSettings cs, |
| int k) { |
| return new ColGroupFactory(in, csi, cs, null, k).compress(); |
| } |
| |
| /** |
| * |
| * @param in The input matrix, that could have been transposed. If it is transposed the compSettings should specify |
| * this. |
| * @param csi The compression information extracted from the estimation, this contains which groups of columns to |
| * compress together. |
| * @param cs The compression settings to specify how to compress. |
| * @param ce The cost estimator used for the compression |
| * @param k The degree of parallelism to be used in the compression of the column groups. |
| * @return A resulting array of ColGroups, containing the compressed information from the input matrix block. |
| */ |
| public static List<AColGroup> compressColGroups(MatrixBlock in, CompressedSizeInfo csi, CompressionSettings cs, |
| ACostEstimate ce, int k) { |
| return new ColGroupFactory(in, csi, cs, ce, k).compress(); |
| } |
| |
| private List<AColGroup> compress() { |
| try { |
| if(in instanceof CompressedMatrixBlock) |
| return CLALibCombineGroups.combine((CompressedMatrixBlock) in, csi, pool); |
| else |
| return compressExecute(); |
| } |
| catch(Exception e) { |
| throw new DMLCompressionException("Compression Failed", e); |
| } |
| finally { |
| if(pool != null) |
| pool.shutdown(); |
| } |
| } |
| |
| private List<AColGroup> compressExecute() throws Exception { |
| if(in.isEmpty()) { |
| AColGroup empty = ColGroupEmpty.create(cs.transposed ? in.getNumRows() : in.getNumColumns()); |
| return Collections.singletonList(empty); |
| } |
| else if(k <= 1) |
| return compressColGroupsSingleThreaded(); |
| else |
| return compressColGroupsParallel(); |
| } |
| |
| private List<AColGroup> compressColGroupsSingleThreaded() throws Exception { |
| List<AColGroup> ret = new ArrayList<>(csi.getNumberColGroups()); |
| |
| for(CompressedSizeInfoColGroup g : csi.getInfo()) |
| ret.add(compressColGroup(g)); |
| |
| return ret; |
| } |
| |
| private List<AColGroup> compressColGroupsParallel() throws Exception { |
| |
| final List<CompressedSizeInfoColGroup> groups = csi.getInfo(); |
| final int nGroups = groups.size(); |
| // final int blkz = nGroups * 10 / k; |
| final int skip = Math.min(k * 10, nGroups); |
| final List<CompressTask> tasks = new ArrayList<>(skip); |
| |
| // sort to make the "assumed" big jobs first. |
| Collections.sort(groups, Comparator.comparing(g -> -g.getNumVals())); |
| |
| final AColGroup[] ret = new AColGroup[nGroups]; |
| |
| for(int i = 0; i < skip; i++) |
| tasks.add(new CompressTask(groups, ret, i, skip)); |
| |
| for(Future<Object> t : pool.invokeAll(tasks)) |
| t.get(); |
| |
| return Arrays.asList(ret); |
| |
| } |
| |
| protected AColGroup compressColGroup(CompressedSizeInfoColGroup cg) throws Exception { |
| if(LOG.isDebugEnabled() && nCol < 1000 && ce != null) { |
| final Timing time = new Timing(true); |
| final AColGroup ret = compressColGroupAllSteps(cg); |
| logEstVsActual(time.stop(), ret, cg); |
| return ret; |
| } |
| return compressColGroupAllSteps(cg); |
| } |
| |
| private void logEstVsActual(double time, AColGroup act, CompressedSizeInfoColGroup est) { |
| final double estC = ce.getCost(est); |
| final double actC = ce.getCost(act, nRow); |
| final String retType = act.getClass().getSimpleName().toString(); |
| final String cols = est.getColumns().toString(); |
| final String wanted = est.getBestCompressionType().toString(); |
| if(estC < actC * 0.75) { |
| String warning = "The estimate cost is significantly off : " + est; |
| LOG.debug( |
| String.format("time[ms]: %10.2f %25s est %10.0f -- act %10.0f distinct:%5d cols:%s wanted:%s\n\t\t%s", time, |
| retType, estC, actC, act.getNumValues(), cols, wanted, warning)); |
| } |
| else { |
| LOG.debug(String.format("time[ms]: %10.2f %25s est %10.0f -- act %10.0f distinct:%5d cols:%s wanted:%s", time, |
| retType, estC, actC, act.getNumValues(), cols, wanted)); |
| } |
| |
| } |
| |
| private AColGroup compressColGroupAllSteps(CompressedSizeInfoColGroup cg) throws Exception { |
| AColGroup g = compress(cg); |
| if(ce != null && ce.shouldSparsify() && nCol >= 4) |
| g = sparsifyFOR(g); |
| return g; |
| } |
| |
| private static AColGroup sparsifyFOR(AColGroup g) { |
| if(g instanceof ColGroupDDC) |
| return ((ColGroupDDC) g).sparsifyFOR(); |
| else if(g instanceof ColGroupSDC) |
| return ((ColGroupSDC) g).sparsifyFOR(); |
| else |
| return g; |
| } |
| |
| private AColGroup compress(CompressedSizeInfoColGroup cg) throws Exception { |
| final IColIndex colIndexes = cg.getColumns(); |
| final CompressionType ct = cg.getBestCompressionType(); |
| final boolean t = cs.transposed; |
| |
| // Fast path compressions |
| if(ct == CompressionType.EMPTY && (!t || isAllNanTransposed(cg))) |
| return new ColGroupEmpty(colIndexes); |
| else if(ct == CompressionType.UNCOMPRESSED) // don't construct mapping if uncompressed |
| return ColGroupUncompressed.create(colIndexes, in, t); |
| else if((ct == CompressionType.SDC || ct == CompressionType.CONST) // |
| && in.isInSparseFormat() // |
| && t && (// |
| (colIndexes.size() > 1 && cg.getNumOffs() < 0.3 * nRow) // |
| || colIndexes.size() == 1)) |
| return compressSDCFromSparseTransposedBlock(colIndexes, cg.getNumVals(), cg.getTupleSparsity()); |
| else if(ct == CompressionType.DDC) |
| return directCompressDDC(colIndexes, cg); |
| else if(ct == CompressionType.LinearFunctional) |
| return compressLinearFunctional(colIndexes, in, cs); |
| else if(ct == CompressionType.DDCFOR) { |
| AColGroup g = directCompressDDC(colIndexes, cg); |
| if(g instanceof ColGroupDDC) |
| return ColGroupDDCFOR.sparsifyFOR((ColGroupDDC) g); |
| return g; |
| } |
| |
| final ABitmap ubm = BitmapEncoder.extractBitmap(colIndexes, in, cg.getNumVals(), cs); |
| if(ubm == null) // no values ... therefore empty |
| return new ColGroupEmpty(colIndexes); |
| |
| final IntArrayList[] of = ubm.getOffsetList(); |
| if(of.length == 1 && of[0].size() == nRow) // If this always constant |
| return ColGroupConst.create(colIndexes, DictionaryFactory.create(ubm)); |
| |
| final double tupleSparsity = colIndexes.size() > 4 ? cg.getTupleSparsity() : 1.0; |
| |
| switch(ct) { |
| case RLE: |
| return ColGroupRLE.compressRLE(colIndexes, ubm, nRow, tupleSparsity); |
| case OLE: |
| return ColGroupOLE.compressOLE(colIndexes, ubm, nRow, tupleSparsity); |
| case CONST: // in case somehow one requested const, but it was not const column fall back to SDC. |
| case EMPTY: |
| LOG.warn("Requested " + ct + " on non constant column, fallback to SDC"); |
| case SDC: |
| return compressSDC(colIndexes, nRow, ubm, cs, tupleSparsity); |
| case SDCFOR: |
| AColGroup g = compressSDC(colIndexes, nRow, ubm, cs, tupleSparsity); |
| if(g instanceof ColGroupSDC) |
| return ColGroupSDCFOR.sparsifyFOR((ColGroupSDC) g); |
| return g; |
| default: |
| throw new DMLCompressionException("Not implemented compression of " + ct + " in factory."); |
| } |
| } |
| |
| private AColGroup directCompressDDC(IColIndex colIndexes, CompressedSizeInfoColGroup cg) throws Exception { |
| if(colIndexes.size() > 1) |
| return directCompressDDCMultiCol(colIndexes, cg); |
| else |
| return directCompressDDCSingleCol(colIndexes, cg); |
| } |
| |
| private AColGroup directCompressDDCSingleCol(IColIndex colIndexes, CompressedSizeInfoColGroup cg) { |
| final int col = colIndexes.get(0); |
| final AMapToData d = MapToFactory.create(nRow, Math.max(Math.min(cg.getNumOffs() + 1, nRow), 126)); |
| final DoubleCountHashMap map = new DoubleCountHashMap(cg.getNumVals()); |
| |
| // unlike multi-col no special handling of zero entries are needed. |
| if(cs.transposed) |
| readToMapDDCTransposed(col, map, d); |
| else |
| readToMapDDC(col, map, d); |
| |
| if(map.size() == 0) |
| return new ColGroupEmpty(colIndexes); |
| |
| ADictionary dict = DictionaryFactory.create(map); |
| final int nUnique = map.size(); |
| final AMapToData resData = MapToFactory.resize(d, nUnique); |
| return ColGroupDDC.create(colIndexes, dict, resData, null); |
| } |
| |
| private AColGroup directCompressDDCMultiCol(IColIndex colIndexes, CompressedSizeInfoColGroup cg) throws Exception { |
| final AMapToData d = MapToFactory.create(nRow, Math.max(Math.min(cg.getNumOffs() + 1, nRow), 126)); |
| final int fill = d.getUpperBoundValue(); |
| d.fill(fill); |
| |
| final DblArrayCountHashMap map = new DblArrayCountHashMap(Math.max(cg.getNumVals(), 64), colIndexes.size()); |
| boolean extra; |
| if(nRow < CompressionSettings.PAR_DDC_THRESHOLD || k == 1) |
| extra = readToMapDDC(colIndexes, map, d, 0, nRow, fill); |
| else |
| extra = parallelReadToMapDDC(colIndexes, map, d, nRow, fill, k); |
| |
| if(map.size() == 0) |
| // If the column was empty. |
| // This is highly unlikely but could happen if forced compression of |
| // not transposed column and the estimator says use DDC. |
| return new ColGroupEmpty(colIndexes); |
| |
| ADictionary dict = DictionaryFactory.create(map, colIndexes.size(), extra, cg.getTupleSparsity()); |
| |
| if(extra) |
| d.replace(fill, map.size()); |
| final int nUnique = map.size() + (extra ? 1 : 0); |
| final AMapToData resData = MapToFactory.resize(d, nUnique); |
| return ColGroupDDC.create(colIndexes, dict, resData, null); |
| } |
| |
| private boolean readToMapDDC(IColIndex colIndexes, DblArrayCountHashMap map, AMapToData data, int rl, int ru, |
| int fill) { |
| ReaderColumnSelection reader = ReaderColumnSelection.createReader(in, colIndexes, cs.transposed, rl, ru); |
| DblArray cellVals = reader.nextRow(); |
| boolean extra = false; |
| int r = rl; |
| while(r < ru && cellVals != null) { |
| final int row = reader.getCurrentRowIndex(); |
| if(row == r) { |
| final int id = map.increment(cellVals); |
| data.set(row, id); |
| cellVals = reader.nextRow(); |
| r++; |
| } |
| else { |
| r = row; |
| extra = true; |
| } |
| } |
| |
| if(r < ru) |
| extra = true; |
| |
| return extra; |
| } |
| |
| private void readToMapDDC(int col, DoubleCountHashMap map, AMapToData data) { |
| if(in.isInSparseFormat()) { |
| // not good but could happen |
| final SparseBlock sb = in.getSparseBlock(); |
| for(int r = 0; r < nRow; r++) { |
| if(sb.isEmpty(r)) |
| data.set(r, map.increment(0)); |
| else { |
| final int apos = sb.pos(r); |
| final int alen = sb.size(r) + apos; |
| final int[] aix = sb.indexes(r); |
| final int idx = Arrays.binarySearch(aix, apos, alen, col); |
| if(idx < 0) |
| data.set(r, map.increment(0)); |
| else |
| data.set(r, map.increment(sb.values(r)[idx])); |
| } |
| } |
| } |
| else if(in.getDenseBlock().isContiguous()) { |
| final double[] dv = in.getDenseBlockValues(); |
| int off = col; |
| for(int r = 0; r < nRow; r++, off += nCol) { |
| final int id = map.increment(dv[off]); |
| data.set(r, id); |
| } |
| } |
| else { |
| throw new NotImplementedException(""); |
| } |
| } |
| |
| private void readToMapDDCTransposed(int col, DoubleCountHashMap map, AMapToData data) { |
| if(in.isInSparseFormat()) { |
| final SparseBlock sb = in.getSparseBlock(); |
| if(sb.isEmpty(col)) |
| // It should never be empty here. |
| return; |
| |
| final int apos = sb.pos(col); |
| final int alen = sb.size(col) + apos; |
| final int[] aix = sb.indexes(col); |
| final double[] aval = sb.values(col); |
| // count zeros |
| if(nRow - apos - alen > 0) |
| map.increment(0, nRow - apos - alen); |
| // insert all other counts |
| for(int j = apos; j < alen; j++) { |
| final int id = map.increment(aval[j]); |
| data.set(aix[j], id); |
| } |
| } |
| else { |
| final DenseBlock db = in.getDenseBlock(); |
| final double[] dv = db.values(col); |
| int off = db.pos(col); |
| for(int r = 0; r < nRow; r++, off++) |
| data.set(r, map.increment(dv[off])); |
| } |
| } |
| |
| private boolean parallelReadToMapDDC(IColIndex colIndexes, DblArrayCountHashMap map, AMapToData data, int rlen, |
| int fill, int k) throws Exception { |
| |
| final int blk = Math.max(rlen / colIndexes.size() / k, 64000 / colIndexes.size()); |
| |
| List<readToMapDDCTask> tasks = new ArrayList<>(); |
| for(int i = 0; i < rlen; i += blk) { |
| int end = Math.min(rlen, i + blk); |
| tasks.add(new readToMapDDCTask(colIndexes, map, data, i, end, fill)); |
| } |
| boolean extra = false; |
| for(Future<Boolean> t : pool.invokeAll(tasks)) |
| extra |= t.get(); |
| |
| return extra; |
| |
| } |
| |
| private static AColGroup compressSDC(IColIndex colIndexes, int rlen, ABitmap ubm, CompressionSettings cs, |
| double tupleSparsity) { |
| |
| final int numZeros = ubm.getNumZeros(); |
| IntArrayList[] offs = ubm.getOffsetList(); |
| int largestOffset = offs[0].size(); |
| int largestIndex = 0; |
| if(!cs.sortTuplesByFrequency) { |
| int index = 0; |
| for(IntArrayList a : ubm.getOffsetList()) { |
| if(a.size() > largestOffset) { |
| largestOffset = a.size(); |
| largestIndex = index; |
| } |
| index++; |
| } |
| } |
| final int nVal = ubm.getNumValues(); |
| |
| // Currently not effecient allocation of the dictionary. |
| if(nVal == 1 && numZeros >= largestOffset) { |
| ADictionary dict = DictionaryFactory.create(ubm, tupleSparsity); |
| final AOffset off = OffsetFactory.createOffset(ubm.getOffsetList()[0].extractValues(true)); |
| return ColGroupSDCSingleZeros.create(colIndexes, rlen, dict, off, null); |
| } |
| else if((nVal == 2 && numZeros == 0) // case 1 : two distinct non zero values |
| || (nVal == 1 && numZeros < largestOffset) // case 2: 1 non zero value more frequent than zero. |
| ) { |
| double[] defaultTuple = new double[colIndexes.size()]; |
| ADictionary dict = DictionaryFactory.create(ubm, largestIndex, defaultTuple, tupleSparsity, numZeros > 0); |
| return compressSDCSingle(colIndexes, rlen, ubm, largestIndex, dict, defaultTuple); |
| } |
| else if(numZeros >= largestOffset) { |
| ADictionary dict = DictionaryFactory.create(ubm, tupleSparsity); |
| return compressSDCZero(colIndexes, rlen, ubm, dict, cs); |
| } |
| else |
| return compressSDCNormal(colIndexes, numZeros, rlen, ubm, largestIndex, tupleSparsity, cs); |
| |
| } |
| |
| private static AColGroup compressSDCZero(IColIndex colIndexes, int rlen, ABitmap ubm, ADictionary dict, |
| CompressionSettings cs) { |
| IntArrayList[] offsets = ubm.getOffsetList(); |
| AInsertionSorter s = InsertionSorterFactory.create(rlen, offsets, cs.sdcSortType); |
| AOffset indexes = OffsetFactory.createOffset(s.getIndexes()); |
| AMapToData data = s.getData(); |
| data = MapToFactory.resize(data, dict.getNumberOfValues(colIndexes.size())); |
| return ColGroupSDCZeros.create(colIndexes, rlen, dict, indexes, data, null); |
| } |
| |
| private static AColGroup compressSDCNormal(IColIndex colIndexes, int numZeros, int rlen, ABitmap ubm, |
| int largestIndex, double tupleSparsity, CompressionSettings cs) { |
| final double[] defaultTuple = new double[colIndexes.size()]; |
| final ADictionary dict = DictionaryFactory.create(ubm, largestIndex, defaultTuple, tupleSparsity, numZeros > 0); |
| AInsertionSorter s = InsertionSorterFactory.createNegative(rlen, ubm.getOffsetList(), largestIndex, |
| cs.sdcSortType); |
| AOffset indexes = OffsetFactory.createOffset(s.getIndexes()); |
| AMapToData _data = s.getData(); |
| _data = MapToFactory.resize(_data, dict.getNumberOfValues(colIndexes.size())); |
| return ColGroupSDC.create(colIndexes, rlen, dict, defaultTuple, indexes, _data, null); |
| } |
| |
| private static AColGroup compressSDCSingle(IColIndex colIndexes, int rlen, ABitmap ubm, int largestIndex, |
| ADictionary dict, double[] defaultTuple) { |
| if(ubm.getOffsetList().length > 1) { |
| // flipping first bit is same as saying index 1 if zero else index 0 if one or ! |
| AOffset off = OffsetFactory.createOffset(ubm.getOffsetsList(largestIndex ^ 1)); |
| return ColGroupSDCSingle.create(colIndexes, rlen, dict, defaultTuple, off, null); |
| } |
| else { |
| IntArrayList inv = ubm.getOffsetsList(0); |
| int[] indexes = new int[rlen - inv.size()]; |
| int p = 0; |
| int v = 0; |
| for(int i = 0; i < inv.size(); i++) { |
| int j = inv.get(i); |
| while(v < j) |
| indexes[p++] = v++; |
| v++; |
| } |
| |
| while(v < rlen) |
| indexes[p++] = v++; |
| AOffset off = OffsetFactory.createOffset(indexes); |
| |
| return ColGroupSDCSingle.create(colIndexes, rlen, dict, defaultTuple, off, null); |
| } |
| } |
| |
| private static AColGroup compressLinearFunctional(IColIndex colIndexes, MatrixBlock in, CompressionSettings cs) { |
| double[] coefficients = LinearRegression.regressMatrixBlock(in, colIndexes, cs.transposed); |
| int numRows = cs.transposed ? in.getNumColumns() : in.getNumRows(); |
| return ColGroupLinearFunctional.create(colIndexes, coefficients, numRows); |
| } |
| |
| private AColGroup compressSDCFromSparseTransposedBlock(IColIndex cols, int nrUniqueEstimate, double tupleSparsity) { |
| if(cols.size() > 1) |
| return compressMultiColSDCFromSparseTransposedBlock(cols, nrUniqueEstimate, tupleSparsity); |
| else |
| return compressSingleColSDCFromSparseTransposedBlock(cols, nrUniqueEstimate); |
| } |
| |
| private AColGroup compressMultiColSDCFromSparseTransposedBlock(IColIndex cols, int nrUniqueEstimate, |
| double tupleSparsity) { |
| |
| HashSet<Integer> offsetsSet = new HashSet<>(); |
| |
| SparseBlock sb = in.getSparseBlock(); |
| |
| for(int i = 0; i < cols.size(); i++) { |
| final int idx = cols.get(i); |
| if(sb.isEmpty(idx)) |
| continue; |
| |
| final int apos = sb.pos(idx); |
| final int alen = sb.size(idx) + apos; |
| final int[] aix = sb.indexes(idx); |
| for(int j = apos; j < alen; j++) |
| offsetsSet.add(aix[j]); |
| } |
| |
| if(offsetsSet.isEmpty()) |
| return new ColGroupEmpty(cols); |
| |
| int[] offsetsInt = offsetsSet.stream().mapToInt(Number::intValue).toArray(); |
| Arrays.sort(offsetsInt); |
| |
| MatrixBlock sub = new MatrixBlock(offsetsInt.length, cols.size(), false); |
| sub.allocateDenseBlock(); |
| sub.setNonZeros(offsetsInt.length * cols.size()); |
| double[] subV = sub.getDenseBlockValues(); |
| |
| for(int i = 0; i < cols.size(); i++) { |
| final int idx = cols.get(i); |
| if(sb.isEmpty(idx)) |
| continue; |
| final int apos = sb.pos(idx); |
| final int alen = sb.size(idx) + apos; |
| final int[] aix = sb.indexes(idx); |
| final double[] aval = sb.values(idx); |
| int offsetsPos = 0; |
| for(int j = apos; j < alen; j++) { |
| while(offsetsInt[offsetsPos] < aix[j]) |
| offsetsPos++; |
| if(offsetsInt[offsetsPos] == aix[j]) |
| subV[offsetsPos * cols.size() + i] = aval[j]; |
| } |
| } |
| IColIndex subCols = ColIndexFactory.create(cols.size()); |
| ReaderColumnSelection reader = ReaderColumnSelection.createReader(sub, subCols, false); |
| final int mapStartSize = Math.min(nrUniqueEstimate, offsetsInt.length / 2); |
| DblArrayCountHashMap map = new DblArrayCountHashMap(mapStartSize, subCols.size()); |
| |
| DblArray cellVals = null; |
| AMapToData data = MapToFactory.create(offsetsInt.length, 257); |
| |
| while((cellVals = reader.nextRow()) != null) { |
| final int row = reader.getCurrentRowIndex(); |
| data.set(row, map.increment(cellVals)); |
| } |
| |
| ADictionary dict = DictionaryFactory.create(map, cols.size(), false, tupleSparsity); |
| data = MapToFactory.resize(data, map.size()); |
| |
| AOffset offs = OffsetFactory.createOffset(offsetsInt); |
| return ColGroupSDCZeros.create(cols, in.getNumColumns(), dict, offs, data, null); |
| } |
| |
| private AColGroup compressSingleColSDCFromSparseTransposedBlock(IColIndex cols, int nrUniqueEstimate) { |
| |
| // This method should only be called if the cols argument is length 1. |
| final SparseBlock sb = in.getSparseBlock(); |
| if(sb.isEmpty(cols.get(0))) |
| return new ColGroupEmpty(cols); |
| |
| final int sbRow = cols.get(0); |
| final int apos = sb.pos(sbRow); |
| final int alen = sb.size(sbRow) + apos; |
| final double[] vals = sb.values(sbRow); |
| final DoubleCountHashMap map = new DoubleCountHashMap(nrUniqueEstimate); |
| |
| // count distinct items frequencies |
| for(int j = apos; j < alen; j++) |
| if(!Double.isNaN(vals[j])) |
| map.increment(vals[j]); |
| else |
| map.increment(0); |
| |
| DCounts[] entries = map.extractValues(); |
| Arrays.sort(entries, Comparator.comparing(x -> -x.count)); |
| |
| if(entries[0].count < nRow - sb.size(sbRow)) { |
| // If the zero is the default value. |
| final int[] counts = new int[entries.length]; |
| final double[] dict = new double[entries.length]; |
| for(int i = 0; i < entries.length; i++) { |
| final DCounts x = entries[i]; |
| counts[i] = x.count; |
| dict[i] = x.key; |
| x.count = i; |
| } |
| |
| final AOffset offsets = OffsetFactory.createOffset(sb.indexes(sbRow), apos, alen); |
| if(entries.length <= 1) |
| return ColGroupSDCSingleZeros.create(cols, nRow, Dictionary.create(dict), offsets, counts); |
| else { |
| final AMapToData mapToData = MapToFactory.create((alen - apos), entries.length); |
| for(int j = apos; j < alen; j++) |
| if(!Double.isNaN(vals[j])) |
| mapToData.set(j - apos, map.get(vals[j])); |
| else |
| mapToData.set(j - apos, map.get(0.0)); |
| return ColGroupSDCZeros.create(cols, nRow, Dictionary.create(dict), offsets, mapToData, counts); |
| } |
| } |
| else if(entries.length == 1) { |
| // SDCSingle and we know all values are x or 0 |
| final int nonZeros = nRow - entries[0].count; |
| final double x = entries[0].key; |
| final double[] defaultTuple = new double[] {x}; |
| final int[] counts = new int[] {nonZeros}; |
| final int[] notZeroOffsets = new int[nonZeros]; |
| final int[] aix = sb.indexes(sbRow); |
| int i = 0; |
| int r = 0; |
| for(int j = apos; r < aix[alen - 1]; r++) { |
| if(r == aix[j]) |
| j++; |
| else |
| notZeroOffsets[i++] = r; |
| } |
| r++; |
| |
| for(; r < nRow; r++, i++) |
| notZeroOffsets[i] = r; |
| |
| final AOffset offsets = OffsetFactory.createOffset(notZeroOffsets); |
| |
| return ColGroupSDCSingle.create(cols, nRow, null, defaultTuple, offsets, counts); |
| } |
| else { |
| final ABitmap ubm = BitmapEncoder.extractBitmap(cols, in, true, entries.length, true); |
| // zero is not the default value fall back to the standard compression path. |
| return compressSDC(cols, nRow, ubm, cs, 1.0); |
| } |
| } |
| |
| private boolean isAllNanTransposed(CompressedSizeInfoColGroup cg) { |
| final IColIndex cols = cg.getColumns(); |
| return in.isInSparseFormat() ? isAllNanTransposedSparse(cols) : isAllNanTransposedDense(cols); |
| } |
| |
| private boolean isAllNanTransposedSparse(IColIndex cols) { |
| SparseBlock sb = in.getSparseBlock(); |
| IIterate it = cols.iterator(); |
| while(it.hasNext()) { |
| int c = it.next(); |
| if(sb.isEmpty(c)) |
| continue; |
| double[] vl = sb.values(c); |
| for(double v : vl) { |
| if(!Double.isNaN(v)) |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| private boolean isAllNanTransposedDense(IColIndex cols) { |
| if(in.getDenseBlock().isContiguous()) { |
| double[] vals = in.getDenseBlockValues(); |
| IIterate it = cols.iterator(); |
| while(it.hasNext()) { |
| int c = it.next(); |
| int off = c * nRow; |
| for(int r = 0; r < nRow; r++) { |
| if(!Double.isNaN(vals[off + r])) { |
| return false; |
| } |
| } |
| } |
| return true; |
| } |
| else { |
| DenseBlock db = in.getDenseBlock(); |
| IIterate it = cols.iterator(); |
| while(it.hasNext()) { |
| int c = it.next(); |
| double[] vals = db.values(c); |
| int off = db.pos(c); |
| for(int r = 0; r < nRow; r++) { |
| if(!Double.isNaN(vals[off + r])) |
| return false; |
| } |
| } |
| return true; |
| } |
| } |
| |
| private class CompressTask implements Callable<Object> { |
| |
| private final List<CompressedSizeInfoColGroup> _groups; |
| private final AColGroup[] _ret; |
| private final int _off; |
| private final int _step; |
| |
| protected CompressTask(List<CompressedSizeInfoColGroup> groups, AColGroup[] ret, int off, int step) { |
| _groups = groups; |
| _ret = ret; |
| _off = off; |
| _step = step; |
| } |
| |
| @Override |
| public Object call() throws Exception { |
| for(int i = _off; i < _groups.size(); i += _step) |
| _ret[i] = compressColGroup(_groups.get(i)); |
| return null; |
| } |
| } |
| |
| private class readToMapDDCTask implements Callable<Boolean> { |
| private final IColIndex _colIndexes; |
| private final DblArrayCountHashMap _map; |
| private final AMapToData _data; |
| private final int _rl; |
| private final int _ru; |
| private final int _fill; |
| |
| protected readToMapDDCTask(IColIndex colIndexes, DblArrayCountHashMap map, AMapToData data, int rl, int ru, |
| int fill) { |
| _colIndexes = colIndexes; |
| _map = map; |
| _data = data; |
| _rl = rl; |
| _ru = ru; |
| _fill = fill; |
| } |
| |
| @Override |
| public Boolean call() { |
| return Boolean.valueOf(readToMapDDC(_colIndexes, _map, _data, _rl, _ru, _fill)); |
| } |
| } |
| } |