blob: 873862e459fe57f0168258ac7daf2ddf029b5bdf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.common;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import java.util.ArrayList;
import java.util.List;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexLiteral;
import com.tdunning.math.stats.MergingDigest;
import org.apache.calcite.sql.SqlOperator;
import org.apache.drill.metastore.statistics.Histogram;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
/**
* A column specific equi-depth histogram which is meant for numeric data types
*/
@JsonTypeName("numeric-equi-depth")
public class NumericEquiDepthHistogram implements Histogram {
/**
* Use a small non-zero selectivity rather than 0 to account for the fact that
* histogram boundaries are approximate and even if some values lie outside the
* range, we cannot be absolutely sure
*/
private static final double SMALL_SELECTIVITY = 0.0001;
/** For equi-depth, all buckets will have same (approx) number of rows */
@JsonProperty("numRowsPerBucket")
private double numRowsPerBucket;
/** An array of buckets arranged in increasing order of their start boundaries
* Note that the buckets only maintain the start point of the bucket range.
* End point is assumed to be the same as the start point of next bucket, although
* when evaluating the filter selectivity we should treat the interval as [start, end)
* i.e closed on the start and open on the end
*/
@JsonProperty("buckets")
private Double[] buckets;
// Default constructor for deserializer
public NumericEquiDepthHistogram() {}
public NumericEquiDepthHistogram(int numBuckets) {
// If numBuckets = N, we are keeping N + 1 entries since the (N+1)th bucket's
// starting value is the MAX value for the column and it becomes the end point of the
// Nth bucket.
buckets = new Double[numBuckets + 1];
for (int i = 0; i < buckets.length; i++) {
buckets[i] = new Double(0.0);
}
numRowsPerBucket = -1;
}
public double getNumRowsPerBucket() {
return numRowsPerBucket;
}
public void setNumRowsPerBucket(double numRows) {
this.numRowsPerBucket = numRows;
}
public Double[] getBuckets() {
return buckets;
}
@VisibleForTesting
protected void setBucketValue(int index, Double value) {
buckets[index] = value;
}
/**
* Get the number of buckets in the histogram
* number of buckets is 1 less than the total # entries in the buckets array since last
* entry is the end point of the last bucket
*/
@JsonIgnore
public int getNumBuckets() {
return buckets.length - 1;
}
/**
* Estimate the selectivity of a filter which may contain several range predicates and in the general case is of
* type: col op value1 AND col op value2 AND col op value3 ...
* <p>
* e.g a > 10 AND a < 50 AND a >= 20 AND a <= 70 ...
* </p>
* Even though in most cases it will have either 1 or 2 range conditions, we still have to handle the general case
* For each conjunct, we will find the histogram bucket ranges and intersect them, taking into account that the
* first and last bucket may be partially covered and all other buckets in the middle are fully covered.
*/
@Override
public Double estimatedSelectivity(final RexNode columnFilter, final long totalRowCount, final long ndv) {
if (numRowsPerBucket == 0) {
return null;
}
// at a minimum, the histogram should have a start and end point of 1 bucket, so at least 2 entries
Preconditions.checkArgument(buckets.length >= 2, "Histogram has invalid number of entries");
List<RexNode> filterList = RelOptUtil.conjunctions(columnFilter);
Range<Double> fullRange = Range.all();
List<RexNode> unknownFilterList = new ArrayList<RexNode>();
Range<Double> valuesRange = getValuesRange(filterList, fullRange, unknownFilterList);
long numSelectedRows;
// unknown counter is a count of filter predicates whose bucket ranges cannot be
// determined from the histogram; this may happen for instance when there is an expression or
// function involved..e.g col > CAST('10' as INT)
int unknown = unknownFilterList.size();
if (valuesRange.hasLowerBound() || valuesRange.hasUpperBound()) {
numSelectedRows = getSelectedRows(valuesRange, ndv);
} else {
numSelectedRows = 0;
}
if (numSelectedRows <= 0) {
return SMALL_SELECTIVITY;
} else {
// for each 'unknown' range filter selectivity, use a default of 0.5 (matches Calcite)
double scaleFactor = Math.pow(0.5, unknown);
return ((double) numSelectedRows / totalRowCount) * scaleFactor;
}
}
private Range<Double> getValuesRange(List<RexNode> filterList, Range<Double> fullRange, List<RexNode> unkownFilterList) {
Range<Double> currentRange = fullRange;
for (RexNode filter : filterList) {
if (filter instanceof RexCall) {
Double value = getLiteralValue(filter);
if (value == null) {
unkownFilterList.add(filter);
} else {
// get the operator
SqlOperator op = ((RexCall) filter).getOperator();
Range range;
switch (op.getKind()) {
case GREATER_THAN:
range = Range.greaterThan(value);
currentRange = currentRange.intersection(range);
break;
case GREATER_THAN_OR_EQUAL:
range = Range.atLeast(value);
currentRange = currentRange.intersection(range);
break;
case LESS_THAN:
range = Range.lessThan(value);
currentRange = currentRange.intersection(range);
break;
case LESS_THAN_OR_EQUAL:
range = Range.atMost(value);
currentRange = currentRange.intersection(range);
break;
default:
break;
}
}
}
}
return currentRange;
}
@VisibleForTesting
protected long getSelectedRows(final Range range, final long ndv) {
double startBucketFraction = 1.0;
double endBucketFraction = 1.0;
long numRows = 0;
int result;
Double lowValue = null;
Double highValue = null;
final int firstStartPointIndex = 0;
final int lastEndPointIndex = buckets.length - 1;
int startBucket = firstStartPointIndex;
int endBucket = lastEndPointIndex - 1;
if (range.hasLowerBound()) {
lowValue = (Double) range.lowerEndpoint();
// if low value is greater than the end point of the last bucket or if it is equal but the range is open (i.e
// predicate is of type > 5 where 5 is the end point of last bucket) then none of the rows qualify
result = lowValue.compareTo(buckets[lastEndPointIndex]);
if (result > 0 || result == 0 && range.lowerBoundType() == BoundType.OPEN) {
return 0;
}
result = lowValue.compareTo(buckets[firstStartPointIndex]);
// if low value is less than or equal to the first bucket's start point then start with the first bucket and all
// rows in first bucket are included
if (result <= 0) {
startBucket = firstStartPointIndex;
startBucketFraction = 1.0;
} else {
startBucket = getContainingBucket(lowValue, lastEndPointIndex, true);
// expecting start bucket to be >= 0 since other conditions have been handled previously
Preconditions.checkArgument(startBucket >= 0, "Expected start bucket id >= 0");
if (buckets[startBucket + 1].doubleValue() == buckets[startBucket].doubleValue()) {
// if start and end points of the bucket are the same, consider entire bucket
startBucketFraction = 1.0;
} else if (range.lowerBoundType() == BoundType.CLOSED && buckets[startBucket + 1].doubleValue() == lowValue.doubleValue()) {
// predicate is of type >= 5.0 and 5.0 happens to be the start point of the bucket
// In this case, use the overall NDV to approximate
startBucketFraction = 1.0 / ndv;
} else {
startBucketFraction = ((double) (buckets[startBucket + 1] - lowValue)) / (buckets[startBucket + 1] - buckets[startBucket]);
}
}
}
if (range.hasUpperBound()) {
highValue = (Double) range.upperEndpoint();
// if the high value is less than the start point of the first bucket or if it is equal but the range is open (i.e
// predicate is of type < 1 where 1 is the start point of the first bucket) then none of the rows qualify
result = highValue.compareTo(buckets[firstStartPointIndex]);
if (result < 0 || (result == 0 && range.upperBoundType() == BoundType.OPEN)) {
return 0;
}
result = highValue.compareTo(buckets[lastEndPointIndex]);
// if high value is greater than or equal to the last bucket's end point then include the last bucket and all rows in
// last bucket qualify
if (result >= 0) {
endBucket = lastEndPointIndex - 1;
endBucketFraction = 1.0;
} else {
endBucket = getContainingBucket(highValue, lastEndPointIndex, false);
// expecting end bucket to be >= 0 since other conditions have been handled previously
Preconditions.checkArgument(endBucket >= 0, "Expected end bucket id >= 0");
if (buckets[endBucket + 1].doubleValue() == buckets[endBucket].doubleValue()) {
// if start and end points of the bucket are the same, consider entire bucket
endBucketFraction = 1.0;
} else if (range.upperBoundType() == BoundType.CLOSED && buckets[endBucket].doubleValue() == highValue.doubleValue()) {
// predicate is of type <= 5.0 and 5.0 happens to be the start point of the bucket
// In this case, use the overall NDV to approximate
endBucketFraction = 1.0/ndv;
} else {
endBucketFraction = ((double) (highValue - buckets[endBucket])) / (buckets[endBucket + 1] - buckets[endBucket]);
}
}
}
Preconditions.checkArgument(startBucket >= 0 && startBucket + 1 <= lastEndPointIndex, "Invalid startBucket: " + startBucket);
Preconditions.checkArgument(endBucket >= 0 && endBucket + 1 <= lastEndPointIndex, "Invalid endBucket: " + endBucket);
Preconditions.checkArgument(startBucket <= endBucket,
"Start bucket: " + startBucket + " should be less than or equal to end bucket: " + endBucket);
if (startBucket == endBucket) {
// if the start and end buckets are the same, interpolate based on the difference between the high and low value
if (highValue != null && lowValue != null) {
numRows = (long) ((highValue - lowValue) / (buckets[startBucket + 1] - buckets[startBucket]) * numRowsPerBucket);
} else if (highValue != null) {
numRows = (long) (endBucketFraction * numRowsPerBucket);
} else {
numRows = (long) (startBucketFraction * numRowsPerBucket);
}
} else {
int numIntermediateBuckets = (endBucket > startBucket + 1) ? (endBucket - startBucket - 1) : 0;
numRows = (long) ((startBucketFraction + endBucketFraction) * numRowsPerBucket + numIntermediateBuckets * numRowsPerBucket);
}
return numRows;
}
/**
* Get the start point of the containing bucket for the supplied value. If there are multiple buckets with the
* same start point, return either the first matching or last matching depending on firstMatching flag
* @param value the input double value
* @param lastEndPointIndex
* @param firstMatching If true, return the first bucket that matches the specified criteria otherwise return the last one
* @return index of either the first or last matching bucket if a match was found, otherwise return -1
*/
private int getContainingBucket(final Double value, final int lastEndPointIndex, final boolean firstMatching) {
int i = 0;
int containing_bucket = -1;
// check which bucket this value falls in
for (; i <= lastEndPointIndex; i++) {
int result = buckets[i].compareTo(value);
if (result > 0) {
containing_bucket = i - 1;
break;
} else if (result == 0) {
// if we are already at the lastEndPointIndex, mark the containing bucket
// as i-1 because the containing bucket should correspond to the start point of the bucket
// (recall that lastEndPointIndex is the final end point of the last bucket)
containing_bucket = (i == lastEndPointIndex) ? i - 1 : i;
if (firstMatching) {
// break if we are only interested in the first matching bucket
break;
}
}
}
return containing_bucket;
}
private Double getLiteralValue(final RexNode filter) {
Double value = null;
List<RexNode> operands = ((RexCall) filter).getOperands();
if (operands.size() == 2 && operands.get(1) instanceof RexLiteral) {
RexLiteral l = ((RexLiteral) operands.get(1));
switch (l.getTypeName()) {
case DATE:
case TIMESTAMP:
case TIME:
value = (double) ((java.util.Calendar) l.getValue()).getTimeInMillis();
break;
case INTEGER:
case BIGINT:
case FLOAT:
case DOUBLE:
case DECIMAL:
case BOOLEAN:
value = l.getValueAs(Double.class);
break;
default:
break;
}
}
return value;
}
/**
* Build a Numeric Equi-Depth Histogram from a t-digest byte array
* @param tdigest_array
* @param numBuckets
* @param nonNullCount
* @return An instance of NumericEquiDepthHistogram
*/
public static NumericEquiDepthHistogram buildFromTDigest(final byte[] tdigest_array,
final int numBuckets,
final long nonNullCount) {
MergingDigest tdigest = MergingDigest.fromBytes(java.nio.ByteBuffer.wrap(tdigest_array));
NumericEquiDepthHistogram histogram = new NumericEquiDepthHistogram(numBuckets);
final double q = 1.0/numBuckets;
int i = 0;
for (; i < numBuckets; i++) {
// get the starting point of the i-th quantile
double start = tdigest.quantile(q * i);
histogram.buckets[i] = start;
}
// for the N-th bucket, the end point corresponds to the 1.0 quantile but we don't keep the end
// points; only the start point, so this is stored as the start point of the (N+1)th bucket
histogram.buckets[i] = tdigest.quantile(1.0);
// Each bucket stores approx equal number of rows. Here, we take into consideration the nonNullCount
// supplied since the stats may have been collected with sampling. Sampling of 20% means only 20% of the
// tuples will be stored in the t-digest. However, the overall stats such as totalRowCount, nonNullCount and
// NDV would have already been extrapolated up from the sample. So, we take the max of the t-digest size and
// the supplied nonNullCount. Why non-null ? Because only non-null values are stored in the t-digest.
double numRowsPerBucket = (double)(Math.max(tdigest.size(), nonNullCount))/numBuckets;
histogram.setNumRowsPerBucket(numRowsPerBucket);
return histogram;
}
}