blob: a4347dfa35be8b5243ec5b01c2cebcd4543fcd37 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.sysds.runtime.compress.cocode;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.CompressionSettings;
import org.apache.sysds.runtime.compress.estim.CompressedSizeEstimator;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfo;
import org.apache.sysds.runtime.compress.estim.CompressedSizeInfoColGroup;
import org.apache.sysds.runtime.util.CommonThreadPool;
public class PlanningCoCoder {
private static final Log LOG = LogFactory.getLog(PlanningCoCoder.class.getName());
public enum PartitionerType {
* Main entry point of CoCode.
* This package groups together ColGroups across columns, to improve compression further,
* @param sizeEstimator The size estimator used for estimating ColGroups potential sizes.
* @param colInfos The information already gathered on the individual ColGroups of columns.
* @param numRows The number of rows in the input matrix.
* @param k The concurrency degree allowed for this operation.
* @param cs The Compression Settings used in the compression.
* @return The Estimated (hopefully) best groups of ColGroups.
public static List<int[]> findCoCodesByPartitioning(CompressedSizeEstimator sizeEstimator,
CompressedSizeInfo colInfos, int numRows, int k, CompressionSettings cs) {
// filtering out non-group-able columns as singleton groups
// weight is the ratio of its cardinality to the number of rows
List<Integer> cols = colInfos.colsC;
CompressedSizeInfoColGroup[] colGroups = colInfos.compressionInfo;
int numCols = cols.size();
List<Integer> groupCols = new ArrayList<>();
HashMap<Integer, GroupableColInfo> groupColsInfo = new HashMap<>();
for(int i = 0; i < numCols; i++) {
int colIx = cols.get(i);
int cardinality = colGroups[colIx].getEstCard();
double weight = ((double) cardinality) / numRows;
groupColsInfo.put(colIx, new GroupableColInfo(weight, colGroups[colIx].getMinSize(), cardinality));
// use column group partitioner to create partitions of columns
List<int[]> bins = createColumnGroupPartitioner(cs.columnPartitioner)
.partitionColumns(groupCols, groupColsInfo, cs);
if(cs.columnPartitioner == PartitionerType.COST) {
return bins;
// brute force grouping within each partition
return (k > 1) ? getCocodingGroupsBruteForce(bins,
k) : getCocodingGroupsBruteForce(bins, groupColsInfo, sizeEstimator, numRows);
private static List<int[]> getCocodingGroupsBruteForce(List<int[]> bins,
HashMap<Integer, GroupableColInfo> groupColsInfo, CompressedSizeEstimator estimator, int rlen) {
List<int[]> retGroups = new ArrayList<>();
for(int[] bin : bins) {
// building an array of singleton CoCodingGroup
ArrayList<PlanningCoCodingGroup> sgroups = new ArrayList<>();
for(int col : bin)
sgroups.add(new PlanningCoCodingGroup(col, groupColsInfo.get(col)));
// brute force co-coding
PlanningCoCodingGroup[] outputGroups = findCocodesBruteForce(estimator,
sgroups.toArray(new PlanningCoCodingGroup[0]));
for(PlanningCoCodingGroup grp : outputGroups)
return retGroups;
private static List<int[]> getCocodingGroupsBruteForce(List<int[]> bins,
HashMap<Integer, GroupableColInfo> groupColsInfo, CompressedSizeEstimator estimator, int rlen, int k) {
List<int[]> retGroups = new ArrayList<>();
try {
ExecutorService pool = CommonThreadPool.get(k);
ArrayList<CocodeTask> tasks = new ArrayList<>();
for(int[] bin : bins) {
// building an array of singleton CoCodingGroup
ArrayList<PlanningCoCodingGroup> sgroups = new ArrayList<>();
for(int col : bin)
sgroups.add(new PlanningCoCodingGroup(col, groupColsInfo.get(col)));
tasks.add(new CocodeTask(estimator, sgroups, rlen));
List<Future<PlanningCoCodingGroup[]>> rtask = pool.invokeAll(tasks);
for(Future<PlanningCoCodingGroup[]> lrtask : rtask)
for(PlanningCoCodingGroup grp : lrtask.get())
catch(Exception ex) {
throw new DMLRuntimeException(ex);
return retGroups;
* Identify columns to code together. Uses a greedy approach that merges pairs of column groups into larger groups.
* Each phase of the greedy algorithm considers all combinations of pairs to merge.
* TODO Find better faster ways of finding cocodes than brute force.
* @param sizeEstimator compressed size estimator
* @param numRowsWeight number of rows weight
* @param singltonGroups planning co-coding groups
* @return A PlanningCoCodingGroup.
private static PlanningCoCodingGroup[] findCocodesBruteForce(CompressedSizeEstimator estimator, int numRows,
PlanningCoCodingGroup[] singletonGroups) {
LOG.trace("Cocoding: process " + singletonGroups.length);
List<PlanningCoCodingGroup> workset = new ArrayList<>(Arrays.asList(singletonGroups));
// establish memo table for extracted column groups
PlanningMemoTable memo = new PlanningMemoTable();
// process merging iterations until no more change
boolean changed = true;
while(changed && workset.size() > 1) {
// find best merge, incl memoization
PlanningCoCodingGroup tmp = null;
for(int i = 0; i < workset.size(); i++) {
for(int j = i + 1; j < workset.size(); j++) {
PlanningCoCodingGroup c1 = workset.get(i);
PlanningCoCodingGroup c2 = workset.get(j);
memo.incrStats(1, 0, 0);
// pruning filter: skip dominated candidates
if(-Math.min(c1.getEstSize(), c2.getEstSize()) > memo.getOptChangeInSize())
// memoization or newly created group (incl bitmap extraction)
PlanningCoCodingGroup c1c2 = memo.getOrCreate(c1, c2, estimator, numRows);
// keep best merged group only
if(tmp == null || c1c2.getChangeInSize() < tmp.getChangeInSize() ||
(c1c2.getChangeInSize() == tmp.getChangeInSize() &&
c1c2.getColIndices().length < tmp.getColIndices().length))
tmp = c1c2;
// modify working set
if(tmp != null && tmp.getChangeInSize() < 0) {
if(LOG.isTraceEnabled()) {
LOG.trace("--merge groups: " + Arrays.toString(tmp.getLeftGroup().getColIndices()) + " and "
+ Arrays.toString(tmp.getRightGroup().getColIndices()));
else {
changed = false;
LOG.trace("--stats: " + Arrays.toString(memo.getStats()));
return workset.toArray(new PlanningCoCodingGroup[0]);
private static ColumnGroupPartitioner createColumnGroupPartitioner(PartitionerType type) {
switch(type) {
return new ColumnGroupPartitionerBinPacking();
case STATIC:
return new ColumnGroupPartitionerStatic();
case COST:
return new ColumnGroupPartitionerCost();
throw new RuntimeException("Unsupported column group partitioner: " + type.toString());
public static class GroupableColInfo {
public final double cardRatio;
public final long size;
public final int nrDistinct;
public GroupableColInfo(double lcardRatio, long lsize, int cardinality) {
cardRatio = lcardRatio;
size = lsize;
nrDistinct = cardinality;
private static class CocodeTask implements Callable<PlanningCoCodingGroup[]> {
private CompressedSizeEstimator _estim = null;
private ArrayList<PlanningCoCodingGroup> _sgroups = null;
private int _rlen = -1;
protected CocodeTask(CompressedSizeEstimator estim, ArrayList<PlanningCoCodingGroup> sgroups, int rlen) {
_estim = estim;
_sgroups = sgroups;
_rlen = rlen;
public PlanningCoCodingGroup[] call() {
// brute force co-coding
return findCocodesBruteForce(_estim, _rlen, _sgroups.toArray(new PlanningCoCodingGroup[0]));