| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.phoenix.util; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.Pair; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| /** |
| * Equi-Depth histogram based on http://web.cs.ucla.edu/~zaniolo/papers/Histogram-EDBT2011-CamReady.pdf, |
| * but without the sliding window - we assume a single window over the entire data set. |
| * |
| * Used to generate the bucket boundaries of a histogram where each bucket has the same # of items. |
| * This is useful, for example, for pre-splitting an index table, by feeding in data from the indexed column. |
| * Works on streaming data - the histogram is dynamically updated for each new value. |
| * |
| * Add values by calling addValue(), then at the end computeBuckets() can be called to get |
| * the buckets with their bounds. |
| * |
| * Average time complexity: O(log(B x p) + (B x p)/T) = nearly constant |
| * B = number of buckets, p = expansion factor constant, T = # of values |
| * |
| * Space complexity: different from paper since here we keep the blocked bars but don't have expiration, |
| * comes out to basically O(log(T)) |
| */ |
| public class EquiDepthStreamHistogram { |
| private static final Logger LOGGER = LoggerFactory.getLogger(EquiDepthStreamHistogram.class); |
| |
| // used in maxSize calculation for each bar |
| private static final double MAX_COEF = 1.7; |
| // higher expansion factor = better accuracy and worse performance |
| private static final short DEFAULT_EXPANSION_FACTOR = 7; |
| private int numBuckets; |
| private int maxBars; |
| @VisibleForTesting |
| long totalCount; // number of values - i.e. count across all bars |
| @VisibleForTesting |
| List<Bar> bars; |
| |
| /** |
| * Create a new histogram |
| * @param numBuckets number of buckets, which can be used to get the splits |
| */ |
| public EquiDepthStreamHistogram(int numBuckets) { |
| this(numBuckets, DEFAULT_EXPANSION_FACTOR); |
| } |
| |
| /** |
| * @param numBuckets number of buckets |
| * @param expansionFactor number of bars = expansionFactor * numBuckets |
| * The more bars, the better the accuracy, at the cost of worse performance |
| */ |
| public EquiDepthStreamHistogram(int numBuckets, int expansionFactor) { |
| this.numBuckets = numBuckets; |
| this.maxBars = numBuckets * expansionFactor; |
| this.bars = new ArrayList<>(maxBars); |
| } |
| |
| /** |
| * Add a new value to the histogram, updating the count for the appropriate bucket |
| * @param value |
| */ |
| public void addValue(byte[] value) { |
| Bar bar = getBar(value); |
| bar.incrementCount(); |
| totalCount++; |
| // split the bar if necessary |
| if (bar.getSize() > getMaxBarSize()) { |
| splitBar(bar); |
| } |
| } |
| |
| /** |
| * Compute the buckets, which have the boundaries and estimated counts. |
| * Note that the right bound for the very last bucket is inclusive. |
| * The left and right bounds can be equivalent, for single value buckets. |
| * @return |
| */ |
| public List<Bucket> computeBuckets() { |
| Preconditions.checkState(bars.size() >= numBuckets, "Not enough data points to compute buckets"); |
| List<Bucket> buckets = new ArrayList<>(); |
| long idealBuckSize = (long) Math.ceil(totalCount / (double) numBuckets); |
| long currCount = 0; |
| int barsIdx = 0; |
| byte[] prevBound = bars.get(0).leftBoundInclusive; |
| Bar currBar = null; |
| for (int i = 0; i < numBuckets; i++) { |
| while (currCount <= idealBuckSize && barsIdx < bars.size()) { |
| currBar = bars.get(barsIdx++); |
| currCount += currBar.getSize(); |
| } |
| long surplus = Math.max(currCount - idealBuckSize, 0); |
| // deviate a bit from the paper here |
| // to estimate the bound, we split the range into 8 splits for a total of 10 including start/end |
| // then we calculate the % of the currBar's count we've used, and round down to the closest split |
| int closestSplitIdx = (int) ((1 - ((double) surplus / currBar.getSize())) * 9); |
| byte[][] splits = Bytes.split(currBar.leftBoundInclusive, currBar.rightBoundExclusive, 8); |
| Bucket bucket = new Bucket(prevBound, splits[closestSplitIdx]); |
| bucket.incrementCountEstimate(currCount - surplus); |
| prevBound = splits[closestSplitIdx]; |
| buckets.add(bucket); |
| currCount = surplus; |
| } |
| return buckets; |
| } |
| |
| /** |
| * @return total number of values added to this histogram |
| */ |
| public long getTotalCount() { |
| return totalCount; |
| } |
| |
| // attempts to split the given bar into two new bars |
| @VisibleForTesting |
| void splitBar(Bar origBar) { |
| // short circuit - don't split a bar of length 1 |
| if (Bytes.compareTo(origBar.leftBoundInclusive, origBar.rightBoundExclusive) == 0) { |
| return; |
| } |
| if (bars.size() == maxBars) { // max bars hit, need to merge two existing bars first |
| boolean mergeSuccessful = mergeBars(); |
| if (!mergeSuccessful) return; // don't split if we couldn't merge |
| } |
| byte[] mid = Bytes.split(origBar.getLeftBoundInclusive(), origBar.getRightBoundExclusive(), 1)[1]; |
| Bar newLeft = new Bar(origBar.getLeftBoundInclusive(), mid); |
| Bar newRight = new Bar(mid, origBar.getRightBoundExclusive()); |
| // distribute blocked bars between the new bars |
| long leftSize = 0; |
| long bbAggCount = origBar.getBlockedBarsSize(); |
| for (Bar bb : origBar.getBlockedBars()) { |
| long bbSize = bb.getSize(); |
| if (leftSize + bbSize < bbAggCount/2) { |
| leftSize += bbSize; |
| newLeft.addBlockedBar(bb); |
| } else { |
| newRight.addBlockedBar(bb); |
| } |
| } |
| // at this point the two new bars may have different counts, |
| // distribute the rest of origBar's count to make them as close as possible |
| long countToDistribute = origBar.getSize() - bbAggCount; |
| long rightSize = newRight.getSize(); |
| long sizeDiff = Math.abs(leftSize - rightSize); |
| Bar smallerBar = leftSize <= rightSize ? newLeft : newRight; |
| if (sizeDiff <= countToDistribute) { |
| smallerBar.incrementCount(sizeDiff); |
| countToDistribute -= sizeDiff; |
| long halfDistrib = countToDistribute / 2; |
| newLeft.incrementCount(halfDistrib); |
| newRight.incrementCount(countToDistribute - halfDistrib); |
| } else { |
| smallerBar.incrementCount(countToDistribute); |
| } |
| if (LOGGER.isTraceEnabled()) { |
| LOGGER.trace(String.format("Split orig=%s , newLeft=%s , newRight=%s", |
| origBar, newLeft, newRight)); |
| } |
| bars.remove(origBar); |
| bars.add(newLeft); |
| bars.add(newRight); |
| // technically don't need to sort here, as we can get the index from getBar, |
| // and put the new bars in the same index. But we'd have to handle merge as well, |
| // doable but not worth the more complicated code since bars.size is fixed and generally small |
| Collections.sort(bars); |
| } |
| |
| //Merges the two adjacent bars with the lowest summed count |
| @VisibleForTesting |
| boolean mergeBars() { |
| Preconditions.checkState(bars.size() > 1, "Need at least two bars to merge"); |
| // pairwise search for the two bars with the smallest summed count |
| int currIdx = 0; |
| Bar currBar = bars.get(currIdx); |
| Bar nextBar = bars.get(currIdx + 1); |
| long currMinSum = Long.MAX_VALUE; |
| int currMinIdx = currIdx; // keep this for fast removal from ArrayList later |
| Pair<Bar, Bar> minBars = new Pair<>(currBar, nextBar); |
| while (nextBar != null) { |
| long sum = currBar.getSize() + nextBar.getSize(); |
| if (sum < currMinSum) { |
| currMinSum = sum; |
| minBars = new Pair<>(currBar, nextBar); |
| currMinIdx = currIdx; |
| } |
| currBar = nextBar; |
| nextBar = ++currIdx < bars.size() - 1 ? bars.get(currIdx+1) : null; |
| } |
| // don't want to merge bars into one that will just need an immediate split again |
| if (currMinSum >= getMaxBarSize()) { |
| return false; |
| } |
| // do the merge |
| Bar leftBar = minBars.getFirst(); |
| Bar rightBar = minBars.getSecond(); |
| Bar newBar = new Bar(leftBar.getLeftBoundInclusive(), rightBar.getRightBoundExclusive()); |
| if (leftBar.getSize() >= rightBar.getSize()) { |
| newBar.incrementCount(rightBar.getCount()); // count of rightBar without its blocked bars |
| // this just adds the leftBar without its blocked bars, as we don't want nested blocked bars |
| // the leftBar's blocked bars are added later below |
| newBar.addBlockedBar(new Bar(leftBar)); |
| } else { |
| newBar.incrementCount(leftBar.getCount()); |
| newBar.addBlockedBar(new Bar(rightBar)); |
| } |
| newBar.addBlockedBars(leftBar.getBlockedBars()); |
| newBar.addBlockedBars(rightBar.getBlockedBars()); |
| bars.subList(currMinIdx, currMinIdx + 2).clear(); // remove minBars |
| bars.add(newBar); |
| Collections.sort(bars); |
| if (LOGGER.isTraceEnabled()) { |
| LOGGER.trace(String.format("Merged left=%s , right=%s , newBar=%s", leftBar, rightBar, newBar)); |
| } |
| return true; |
| } |
| |
| /** |
| * Get the appropriate bar for the value, extending existing bar bounds to accommodate if necessary |
| * @param value value to add |
| * @return the bar for the value |
| */ |
| @VisibleForTesting |
| Bar getBar(byte[] value) { |
| Bar searchKey = new Bar(value, value); |
| int searchIdx = Collections.binarySearch(this.bars, searchKey); |
| if (searchIdx < 0) { |
| // copy value so later changes by caller don't affect histogram results |
| byte[] newBound = Bytes.copy(value); |
| if (this.bars.size() == 0) { |
| Bar firstBar = new Bar(newBound, newBound); |
| bars.add(firstBar); |
| return firstBar; |
| } |
| int expectedIndex = Math.abs(searchIdx + 1); // jdk binary search index |
| if (expectedIndex == bars.size()) { // no bars >= value, need to extend rightBound of last bar |
| Bar lastBar = bars.get(expectedIndex - 1); |
| lastBar.setRightBoundExclusive(newBound); // actually inclusive for last bar |
| return lastBar; |
| } else { // extend leftBound of next greatest bar |
| Bar nextBar = bars.get(expectedIndex); |
| nextBar.setLeftBoundInclusive(newBound); |
| return nextBar; |
| } |
| } else { |
| return bars.get(searchIdx); |
| } |
| } |
| |
| private long getMaxBarSize() { |
| // from the paper, 1.7 has been "determined empirically" |
| // interpretation: We don't want a given bar to deviate more than 70% from its ideal target size |
| return (long) (MAX_COEF * (totalCount / maxBars)); |
| } |
| |
| public static class Bucket { |
| protected long count = 0; |
| protected byte[] leftBoundInclusive; |
| protected byte[] rightBoundExclusive; |
| |
| public Bucket(byte[] leftBoundInclusive, byte[] rightBoundExclusive) { |
| this.leftBoundInclusive = leftBoundInclusive; |
| this.rightBoundExclusive = rightBoundExclusive; |
| } |
| |
| public byte[] getLeftBoundInclusive() { |
| return leftBoundInclusive; |
| } |
| |
| public void setLeftBoundInclusive(byte[] leftBoundInclusive) { |
| this.leftBoundInclusive = leftBoundInclusive; |
| } |
| |
| public byte[] getRightBoundExclusive() { |
| return rightBoundExclusive; |
| } |
| |
| public void setRightBoundExclusive(byte[] rightBoundExclusive) { |
| this.rightBoundExclusive = rightBoundExclusive; |
| } |
| |
| public long getCountEstimate() { |
| return count; |
| } |
| |
| public void incrementCountEstimate(long count) { |
| this.count += count; |
| } |
| |
| @Override |
| public int hashCode() { |
| final int prime = 31; |
| int result = 1; |
| result = prime * result + Arrays.hashCode(leftBoundInclusive); |
| result = prime * result + Arrays.hashCode(rightBoundExclusive); |
| return result; |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (this == obj) return true; |
| if (obj == null) return false; |
| if (getClass() != obj.getClass()) return false; |
| Bucket other = (Bucket) obj; |
| if (!Arrays.equals(leftBoundInclusive, other.leftBoundInclusive)) return false; |
| if (!Arrays.equals(rightBoundExclusive, other.rightBoundExclusive)) return false; |
| return true; |
| } |
| |
| @Override |
| public String toString() { |
| return "Bucket [count=" + count + ", leftBoundInclusive=" |
| + Bytes.toString(leftBoundInclusive) + ", rightBoundExclusive=" |
| + Bytes.toString(rightBoundExclusive) + "]"; |
| } |
| } |
| |
| // Used internally to further subdivide each bucket |
| @VisibleForTesting |
| static class Bar extends Bucket implements Comparable<Bar> { |
| private List<Bar> blockedBars = new ArrayList<>(); // populated through a merge |
| |
| /** |
| * Create a new bar. Single value buckets can have leftBound = rightBound |
| * @param leftBoundInclusive |
| * @param rightBoundExclusive |
| */ |
| public Bar(byte[] leftBoundInclusive, byte[] rightBoundExclusive) { |
| super(leftBoundInclusive, rightBoundExclusive); |
| } |
| |
| /** |
| * Creates a copy of the passed in bar, but without any blocked bars |
| * @param bar |
| */ |
| public Bar(Bar bar) { |
| super(bar.leftBoundInclusive, bar.rightBoundExclusive); |
| this.count = bar.count; |
| } |
| |
| // Used to keep the bars sorted by bounds |
| @Override |
| public int compareTo(Bar other) { |
| // if one bar fully contains the other, they are considered the same. For binary search |
| int leftComp = Bytes.compareTo(this.leftBoundInclusive, other.leftBoundInclusive); |
| int rightComp = Bytes.compareTo(this.rightBoundExclusive, other.rightBoundExclusive); |
| if ((leftComp >= 0 && rightComp < 0) || (leftComp <= 0 && rightComp > 0) |
| || (leftComp == 0 && rightComp == 0)) { |
| return 0; |
| } |
| if (Bytes.compareTo(this.leftBoundInclusive, other.rightBoundExclusive) >= 0) { |
| return 1; |
| } |
| if (Bytes.compareTo(this.rightBoundExclusive, other.leftBoundInclusive) <= 0) { |
| return -1; |
| } |
| throw new AssertionError("Cannot not have overlapping bars"); |
| } |
| |
| /** |
| * @return The aggregate count of this bar and its blocked bars' counts |
| */ |
| public long getSize() { |
| long blockedBarSum = getBlockedBarsSize(); |
| return count + blockedBarSum; |
| } |
| |
| /** |
| * @return The sum of the counts of all the blocked bars |
| */ |
| public long getBlockedBarsSize() { |
| long blockedBarSum = 0; |
| for (Bar bb : blockedBars) { |
| blockedBarSum += bb.getSize(); |
| } |
| return blockedBarSum; |
| } |
| |
| public void addBlockedBar(Bar bar) { |
| blockedBars.add(bar); |
| } |
| |
| public void addBlockedBars(List<Bar> bars) { |
| blockedBars.addAll(bars); |
| } |
| |
| public List<Bar> getBlockedBars() { |
| return blockedBars; |
| } |
| |
| public long getCount() { |
| return this.count; |
| } |
| |
| public void incrementCount() { |
| count++; |
| } |
| |
| public void incrementCount(long increment) { |
| count += increment; |
| } |
| |
| @Override |
| public int hashCode() { |
| final int prime = 31; |
| int result = super.hashCode(); |
| result = prime * result + ((blockedBars == null) ? 0 : blockedBars.hashCode()); |
| result = prime * result + (int) (count ^ (count >>> 32)); |
| return result; |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (this == obj) return true; |
| if (!super.equals(obj)) return false; |
| if (getClass() != obj.getClass()) return false; |
| Bar other = (Bar) obj; |
| if (blockedBars == null) { |
| if (other.blockedBars != null) return false; |
| } else if (!blockedBars.equals(other.blockedBars)) return false; |
| if (count != other.count) return false; |
| return true; |
| } |
| |
| @Override |
| public String toString() { |
| return "Bar[count=" + count + ", blockedBars=" + blockedBars + ", leftBoundInclusive=" |
| + Bytes.toString(leftBoundInclusive) + ", rightBoundExclusive=" |
| + Bytes.toString(rightBoundExclusive) + "]"; |
| } |
| } |
| } |