blob: 16a235c66566788c11deb4aeade70362a611bea0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.query.aggregation.datasketches.quantiles;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.datasketches.quantiles.DoublesSketch;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.AggregatorUtil;
import org.apache.druid.query.aggregation.PostAggregator;
import org.apache.druid.query.cache.CacheKeyBuilder;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;
import java.util.Set;
public class DoublesSketchToHistogramPostAggregator implements PostAggregator
{
static final int DEFAULT_NUM_BINS = 10;
private final String name;
private final PostAggregator field;
private final double[] splitPoints;
private final Integer numBins;
@JsonCreator
public DoublesSketchToHistogramPostAggregator(
@JsonProperty("name") final String name,
@JsonProperty("field") final PostAggregator field,
@JsonProperty("splitPoints") @Nullable final double[] splitPoints,
@JsonProperty("numBins") @Nullable final Integer numBins)
{
this.name = Preconditions.checkNotNull(name, "name is null");
this.field = Preconditions.checkNotNull(field, "field is null");
this.splitPoints = splitPoints;
this.numBins = numBins;
if (splitPoints != null && numBins != null) {
throw new IAE("Cannot accept both 'splitPoints' and 'numBins'");
}
}
@Override
public Object compute(final Map<String, Object> combinedAggregators)
{
final DoublesSketch sketch = (DoublesSketch) field.compute(combinedAggregators);
final int numBins = splitPoints != null ? splitPoints.length + 1 :
(this.numBins != null ? this.numBins.intValue() : DEFAULT_NUM_BINS);
if (numBins < 2) {
throw new IAE("at least 2 bins expected");
}
if (sketch.isEmpty()) {
final double[] histogram = new double[numBins];
Arrays.fill(histogram, Double.NaN);
return histogram;
}
final double[] histogram = sketch.getPMF(splitPoints != null ? splitPoints :
equallySpacedPoints(numBins, sketch.getMinValue(), sketch.getMaxValue()));
for (int i = 0; i < histogram.length; i++) {
histogram[i] *= sketch.getN(); // scale fractions to counts
}
return histogram;
}
// retuns num-1 points that split the interval [min, max] into num equally-spaced intervals
// num must be at least 2
private static double[] equallySpacedPoints(final int num, final double min, final double max)
{
final double[] points = new double[num - 1];
final double delta = (max - min) / num;
for (int i = 0; i < num - 1; i++) {
points[i] = min + delta * (i + 1);
}
return points;
}
@Override
@JsonProperty
public String getName()
{
return name;
}
@JsonProperty
public PostAggregator getField()
{
return field;
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public double[] getSplitPoints()
{
return splitPoints;
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Integer getNumBins()
{
return numBins;
}
@Override
public Comparator<double[]> getComparator()
{
throw new IAE("Comparing histograms is not supported");
}
@Override
public Set<String> getDependentFields()
{
return field.getDependentFields();
}
@Override
public String toString()
{
return getClass().getSimpleName() + "{" +
"name='" + name + '\'' +
", field=" + field +
", splitPoints=" + Arrays.toString(splitPoints) +
", numBins=" + numBins +
"}";
}
@Override
public boolean equals(final Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final DoublesSketchToHistogramPostAggregator that = (DoublesSketchToHistogramPostAggregator) o;
if (!name.equals(that.name)) {
return false;
}
if (!Arrays.equals(splitPoints, that.splitPoints)) {
return false;
}
if (!field.equals(that.field)) {
return false;
}
if (numBins == null && that.numBins == null) {
return true;
}
if (numBins != null && numBins.equals(that.numBins)) {
return true;
}
return false;
}
@Override
public int hashCode()
{
int hashCode = name.hashCode() * 31 + field.hashCode();
hashCode = hashCode * 31 + Arrays.hashCode(splitPoints);
if (numBins != null) {
hashCode = hashCode * 31 + numBins.hashCode();
}
return hashCode;
}
@Override
public byte[] getCacheKey()
{
final CacheKeyBuilder builder = new CacheKeyBuilder(
AggregatorUtil.QUANTILES_DOUBLES_SKETCH_TO_HISTOGRAM_CACHE_TYPE_ID).appendCacheable(field);
if (splitPoints != null) {
for (final double value : splitPoints) {
builder.appendDouble(value);
}
}
if (numBins != null) {
builder.appendInt(numBins);
}
return builder.build();
}
@Override
public PostAggregator decorate(final Map<String, AggregatorFactory> map)
{
return this;
}
}