blob: 42f9acab9d6320306ab264d9b383e1efa4e1e94c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.datasketches.server;
import org.apache.datasketches.Family;
import org.apache.datasketches.SketchesException;
import org.apache.datasketches.cpc.CpcSketch;
import org.apache.datasketches.frequencies.ErrorType;
import org.apache.datasketches.frequencies.ItemsSketch;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.kll.KllFloatsSketch;
import org.apache.datasketches.sampling.ReservoirItemsSketch;
import org.apache.datasketches.sampling.VarOptItemsSamples;
import org.apache.datasketches.sampling.VarOptItemsSketch;
import org.apache.datasketches.theta.CompactSketch;
import org.apache.datasketches.theta.Union;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import static org.apache.datasketches.server.SketchConstants.*;
/**
* Allows querying the sketches to obtain estimates of the relevant quantities.
*
* Full description to be added later.
*/
public class DataQueryHandler extends BaseSketchesQueryHandler {
public DataQueryHandler(final SketchStorage sketches) {
super(sketches, false);
}
@Override
protected JsonObject processQuery(final JsonObject query) {
if (!query.has(QUERY_NAME_FIELD)) {
throw new IllegalArgumentException("Query missing sketch name field");
}
final String key = query.get(QUERY_NAME_FIELD).getAsString();
if (!sketches.contains(key)) {
throw new IllegalArgumentException("Invalid sketch name: " + key);
}
final SketchStorage.SketchEntry se = sketches.getSketch(key);
JsonObject result = new JsonObject();
// pre-populate with the sketch name, but may be overwritten with
// null depending on the queyr
result.addProperty(QUERY_NAME_FIELD, key);
switch (se.family) {
case UNION:
case HLL:
case CPC:
result = processDistinctQuery(result, query, se.family, se.sketch);
break;
case KLL:
result = processQuantilesQuery(result, query, se.family, se.sketch);
break;
case FREQUENCY:
result = processFrequencyQuery(result, query, se.family, se.sketch);
break;
case RESERVOIR:
case VAROPT:
result = processSamplingQuery(result, query, se.family, se.sketch);
break;
default:
throw new IllegalStateException("Unexpected sketch family: " + se.family);
}
return result;
}
private static boolean checkSummaryFlag(final JsonObject query) {
boolean addSummary = false;
if (query != null && query.has(QUERY_SUMMARY_FIELD)) {
addSummary = query.get(QUERY_SUMMARY_FIELD).getAsBoolean();
}
return addSummary;
}
private static JsonObject processDistinctQuery(final JsonObject result, final JsonObject query, final Family type, final Object sketch) {
if (query == null || type == null || sketch == null) {
return null;
}
// check if we need a summary
final boolean addSummary = checkSummaryFlag(query);
String summary = null;
final double estimate;
final boolean isEstimationMode;
final double p1StdDev;
final double p2StdDev;
final double p3StdDev;
final double m1StdDev;
final double m2StdDev;
final double m3StdDev;
switch (type) {
case UNION:
final CompactSketch thetaSketch = ((Union) sketch).getResult();
isEstimationMode = thetaSketch.isEstimationMode();
estimate = thetaSketch.getEstimate();
p1StdDev = thetaSketch.getUpperBound(1);
p2StdDev = thetaSketch.getUpperBound(2);
p3StdDev = thetaSketch.getUpperBound(3);
m1StdDev = thetaSketch.getLowerBound(1);
m2StdDev = thetaSketch.getLowerBound(2);
m3StdDev = thetaSketch.getLowerBound(3);
if (addSummary) { summary = thetaSketch.toString(); }
break;
case CPC:
final CpcSketch cpcSketch = (CpcSketch) sketch;
isEstimationMode = true; // no exact mode
estimate = cpcSketch.getEstimate();
p1StdDev = cpcSketch.getUpperBound(1);
p2StdDev = cpcSketch.getUpperBound(2);
p3StdDev = cpcSketch.getUpperBound(3);
m1StdDev = cpcSketch.getLowerBound(1);
m2StdDev = cpcSketch.getLowerBound(2);
m3StdDev = cpcSketch.getLowerBound(3);
if (addSummary) { summary = cpcSketch.toString(); }
break;
case HLL:
final HllSketch hllSketch = (HllSketch) sketch;
isEstimationMode = hllSketch.isEstimationMode();
estimate = hllSketch.getEstimate();
p1StdDev = hllSketch.getUpperBound(1);
p2StdDev = hllSketch.getUpperBound(2);
p3StdDev = hllSketch.getUpperBound(3);
m1StdDev = hllSketch.getLowerBound(1);
m2StdDev = hllSketch.getLowerBound(2);
m3StdDev = hllSketch.getLowerBound(3);
if (addSummary) { summary = hllSketch.toString(); }
break;
default:
throw new IllegalArgumentException("Unknown distinct counting sketch type: " + type);
}
//final JsonObject result = new JsonObject();
result.addProperty(RESPONSE_ESTIMATE_FIELD, estimate);
result.addProperty(RESPONSE_ESTIMATION_MODE_FIELD, isEstimationMode);
result.addProperty(RESPONSE_P1STDEV_FIELD, p1StdDev);
result.addProperty(RESPONSE_P2STDEV_FIELD, p2StdDev);
result.addProperty(RESPONSE_P3STDEV_FIELD, p3StdDev);
result.addProperty(RESPONSE_M1STDEV_FIELD, m1StdDev);
result.addProperty(RESPONSE_M2STDEV_FIELD, m2StdDev);
result.addProperty(RESPONSE_M3STDEV_FIELD, m3StdDev);
if (addSummary)
result.addProperty(RESPONSE_SUMMARY_FIELD, summary);
return result;
}
private static JsonObject processQuantilesQuery(final JsonObject result, final JsonObject query, final Family type, final Object sketch) {
if (query == null || type == null || sketch == null) {
return null;
}
// check if we need a summary
final boolean addSummary = checkSummaryFlag(query);
String summary = null;
final boolean isEstimationMode;
final float maxValue;
final float minValue;
final long streamLength;
final double[] fractions = getFractionsArray(query);
float[] quantiles = null;
final float[] values = getValuesArray(query);
final String resultType = getResultType(query);
double[] ranks = null;
// since we know REQ is coming
switch (type) {
case KLL:
final KllFloatsSketch kll = (KllFloatsSketch) sketch;
isEstimationMode = kll.isEstimationMode();
maxValue = kll.getMaxValue();
minValue = kll.getMinValue();
streamLength = kll.getN();
// TODO: consider valuesPMF vs valuesCDF calls to allow a mix?
if (values != null) {
ranks = resultType.equals(QUERY_RESULT_TYPE_CDF) ? kll.getCDF(values) : kll.getPMF(values);
}
if (fractions != null) {
quantiles = kll.getQuantiles(fractions);
}
if (addSummary)
summary = kll.toString();
break;
default:
throw new SketchesException("processQuantilesQuery() received a non-quantiles sketch: " + type);
}
//final JsonObject result = new JsonObject();
result.addProperty(RESPONSE_STREAM_LENGTH, streamLength);
result.addProperty(RESPONSE_ESTIMATION_MODE_FIELD, isEstimationMode);
result.addProperty(RESPONSE_MIN_VALUE, minValue);
result.addProperty(RESPONSE_MAX_VALUE, maxValue);
if (ranks != null) {
final String label = resultType.equals(QUERY_RESULT_TYPE_PMF) ? RESPONSE_RESULT_MASS : RESPONSE_RESULT_RANK;
final JsonArray rankArray = new JsonArray();
for (int i = 0; i < ranks.length; ++i) {
final JsonObject rankPair = new JsonObject();
if (i == values.length) {
rankPair.addProperty(RESPONSE_RESULT_VALUE, maxValue);
} else {
rankPair.addProperty(RESPONSE_RESULT_VALUE, values[i]);
}
rankPair.addProperty(label, ranks[i]);
rankArray.add(rankPair);
}
if (resultType.equals(QUERY_RESULT_TYPE_CDF))
result.add(RESPONSE_CDF_LIST, rankArray);
else
result.add(RESPONSE_PMF_LIST, rankArray);
}
if (quantiles != null) {
final JsonArray quantileArray = new JsonArray();
for (int i = 0; i < quantiles.length; ++i) {
final JsonObject quantilePair = new JsonObject();
quantilePair.addProperty(RESPONSE_RESULT_RANK, fractions[i]);
quantilePair.addProperty(RESPONSE_RESULT_QUANTILE, quantiles[i]);
quantileArray.add(quantilePair);
}
result.add(RESPONSE_QUANTILE_LIST, quantileArray);
}
if (addSummary)
result.addProperty(RESPONSE_SUMMARY_FIELD, summary);
return result;
}
// only one sketch type here so could use ItemsSketch<String> instead of Object, but
// we'll eep the signatures generic here
@SuppressWarnings("unchecked")
private static JsonObject processFrequencyQuery(final JsonObject result, final JsonObject query, final Family type, final Object sketch) {
if (query == null || type != Family.FREQUENCY || sketch == null) {
return null;
}
final ItemsSketch<String> sk = (ItemsSketch<String>) sketch;
// check if we need a summary
final boolean addSummary = checkSummaryFlag(query);
if (!query.has(QUERY_ERRORTYPE_FIELD)) {
throw new SketchesException("Must specify a value for " + QUERY_ERRORTYPE_FIELD
+ " for Frequent Items queries");
}
final ErrorType errorType;
final String errorTypeString = query.get(QUERY_ERRORTYPE_FIELD).getAsString();
if (errorTypeString.equals(QUERY_ERRORTYPE_NO_FP)) {
errorType = ErrorType.NO_FALSE_POSITIVES;
} else if (errorTypeString.equals(QUERY_ERRORTYPE_NO_FN)) {
errorType = ErrorType.NO_FALSE_NEGATIVES;
} else {
throw new SketchesException("Unknown Frequent Items ErrorType: " + errorTypeString);
}
final ItemsSketch.Row<String>[] items = sk.getFrequentItems(errorType);
final JsonArray itemArray = new JsonArray();
for (final ItemsSketch.Row<String> item : items) {
final JsonObject row = new JsonObject();
row.addProperty(RESPONSE_ITEM_VALUE, item.getItem());
row.addProperty(RESPONSE_ITEM_ESTIMATE, item.getEstimate());
row.addProperty(RESPONSE_ITEM_UPPER_BOUND, item.getUpperBound());
row.addProperty(RESPONSE_ITEM_LOWER_BOUND, item.getLowerBound());
itemArray.add(row);
}
//final JsonObject result = new JsonObject();
result.add(RESPONSE_ITEMS_ARRAY, itemArray);
if (addSummary)
result.addProperty(RESPONSE_SUMMARY_FIELD, sk.toString());
return result;
}
@SuppressWarnings("unchecked")
private static JsonObject processSamplingQuery(final JsonObject result, final JsonObject query, final Family type, final Object sketch) {
if (query == null || type == null || sketch == null) {
return null;
}
// check if we need a summary
final boolean addSummary = checkSummaryFlag(query);
String summary = null;
final long streamWeight;
final int k;
final JsonArray itemArray = new JsonArray();
switch (type) {
case RESERVOIR:
final ReservoirItemsSketch<String> ris = (ReservoirItemsSketch<String>) sketch;
for (final String item : ris.getSamples()) {
itemArray.add(item);
}
streamWeight = ris.getN();
k = ris.getK();
if (addSummary)
summary = ris.toString();
break;
case VAROPT:
final VarOptItemsSketch<String> vis = (VarOptItemsSketch<String>) sketch;
for (final VarOptItemsSamples<String>.WeightedSample ws : vis.getSketchSamples()) {
final JsonObject item = new JsonObject();
item.addProperty(RESPONSE_ITEM_VALUE, ws.getItem());
item.addProperty(RESPONSE_ITEM_WEIGHT, ws.getWeight());
itemArray.add(item);
}
streamWeight = vis.getN();
k = vis.getK();
if (addSummary)
summary = vis.toString();
break;
default:
throw new SketchesException("processSamplingQuery() received a non-sampling sketch: " + type);
}
//final JsonObject result = new JsonObject();
result.addProperty(RESPONSE_SKETCH_K, k);
result.addProperty(RESPONSE_STREAM_WEIGHT, streamWeight);
result.add(RESPONSE_ITEMS_ARRAY, itemArray);
if (addSummary)
result.addProperty(RESPONSE_SUMMARY_FIELD, summary);
return result;
}
// returns an array of rank points, or null if none in query
private static float[] getValuesArray(final JsonObject query) {
if (query == null || !query.has(QUERY_VALUES_FIELD_NAME)) {
return null;
}
final JsonArray valuesArray = query.get(QUERY_VALUES_FIELD_NAME).getAsJsonArray();
final float[] values = new float[valuesArray.size()];
for (int i = 0; i < values.length; ++i) {
if (!valuesArray.get(i).isJsonPrimitive()) {
throw new SketchesException("Invalid value in array. Must be a floating point value, found: " + valuesArray.get(i));
}
values[i] = valuesArray.get(i).getAsFloat();
}
return values;
}
// returns QUERY_RESULT_TYPE_PMF if specified in QUERY_RESULT_TYPE_NAME_FIELD, otherwise returns default of
// QUERY_RESULT_TYPE_CDF
private static String getResultType(final JsonObject query) {
if (query == null || !query.has(QUERY_RESULT_TYPE_NAME_FIELD)) {
return QUERY_RESULT_TYPE_CDF;
}
final JsonElement resultTypeElement = query.get(QUERY_RESULT_TYPE_NAME_FIELD);
if (!resultTypeElement.isJsonPrimitive())
return QUERY_RESULT_TYPE_CDF;
final String resultType = resultTypeElement.getAsString().toLowerCase();
if (resultType.equals(QUERY_RESULT_TYPE_PMF))
return QUERY_RESULT_TYPE_PMF;
else
return QUERY_RESULT_TYPE_CDF;
}
// returns an array of provided split points, or null if none in query
private static double[] getFractionsArray(final JsonObject query) {
if (query == null || !query.has(QUERY_FRACTIONS_NAME_FIELD)) {
return null;
}
final JsonArray fractionsArray = query.get(QUERY_FRACTIONS_NAME_FIELD).getAsJsonArray();
final double[] fractions = new double[fractionsArray.size()];
for (int i = 0; i < fractions.length; ++i) {
if (!fractionsArray.get(i).isJsonPrimitive()) {
throw new SketchesException("Invalid value in array. Must be float in [0.0, 1.0], found: " + fractionsArray.get(i));
}
fractions[i] = fractionsArray.get(i).getAsFloat();
}
return fractions;
}
}