| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.datasketches.server; |
| |
| import org.apache.datasketches.Family; |
| import org.apache.datasketches.SketchesException; |
| import org.apache.datasketches.cpc.CpcSketch; |
| import org.apache.datasketches.frequencies.ErrorType; |
| import org.apache.datasketches.frequencies.ItemsSketch; |
| import org.apache.datasketches.hll.HllSketch; |
| import org.apache.datasketches.kll.KllFloatsSketch; |
| import org.apache.datasketches.sampling.ReservoirItemsSketch; |
| import org.apache.datasketches.sampling.VarOptItemsSamples; |
| import org.apache.datasketches.sampling.VarOptItemsSketch; |
| import org.apache.datasketches.theta.CompactSketch; |
| import org.apache.datasketches.theta.Union; |
| |
| import com.google.gson.JsonArray; |
| import com.google.gson.JsonElement; |
| import com.google.gson.JsonObject; |
| |
| import static org.apache.datasketches.server.SketchConstants.*; |
| |
| /** |
| * Allows querying the sketches to obtain estimates of the relevant quantities. |
| * |
| * Full description to be added later. |
| */ |
| public class DataQueryHandler extends BaseSketchesQueryHandler { |
| public DataQueryHandler(final SketchStorage sketches) { |
| super(sketches, false); |
| } |
| |
| @Override |
| protected JsonObject processQuery(final JsonObject query) { |
| if (!query.has(QUERY_NAME_FIELD)) { |
| throw new IllegalArgumentException("Query missing sketch name field"); |
| } |
| |
| final String key = query.get(QUERY_NAME_FIELD).getAsString(); |
| if (!sketches.contains(key)) { |
| throw new IllegalArgumentException("Invalid sketch name: " + key); |
| } |
| |
| final SketchStorage.SketchEntry se = sketches.getSketch(key); |
| JsonObject result = new JsonObject(); |
| |
| // pre-populate with the sketch name, but may be overwritten with |
| // null depending on the queyr |
| result.addProperty(QUERY_NAME_FIELD, key); |
| |
| switch (se.family) { |
| case UNION: |
| case HLL: |
| case CPC: |
| result = processDistinctQuery(result, query, se.family, se.sketch); |
| break; |
| |
| case KLL: |
| result = processQuantilesQuery(result, query, se.family, se.sketch); |
| break; |
| |
| case FREQUENCY: |
| result = processFrequencyQuery(result, query, se.family, se.sketch); |
| break; |
| |
| case RESERVOIR: |
| case VAROPT: |
| result = processSamplingQuery(result, query, se.family, se.sketch); |
| break; |
| |
| default: |
| throw new IllegalStateException("Unexpected sketch family: " + se.family); |
| } |
| |
| return result; |
| } |
| |
| private static boolean checkSummaryFlag(final JsonObject query) { |
| boolean addSummary = false; |
| if (query != null && query.has(QUERY_SUMMARY_FIELD)) { |
| addSummary = query.get(QUERY_SUMMARY_FIELD).getAsBoolean(); |
| } |
| return addSummary; |
| } |
| |
| private static JsonObject processDistinctQuery(final JsonObject result, final JsonObject query, final Family type, final Object sketch) { |
| if (query == null || type == null || sketch == null) { |
| return null; |
| } |
| |
| // check if we need a summary |
| final boolean addSummary = checkSummaryFlag(query); |
| String summary = null; |
| |
| final double estimate; |
| final boolean isEstimationMode; |
| final double p1StdDev; |
| final double p2StdDev; |
| final double p3StdDev; |
| final double m1StdDev; |
| final double m2StdDev; |
| final double m3StdDev; |
| |
| switch (type) { |
| case UNION: |
| final CompactSketch thetaSketch = ((Union) sketch).getResult(); |
| isEstimationMode = thetaSketch.isEstimationMode(); |
| estimate = thetaSketch.getEstimate(); |
| p1StdDev = thetaSketch.getUpperBound(1); |
| p2StdDev = thetaSketch.getUpperBound(2); |
| p3StdDev = thetaSketch.getUpperBound(3); |
| m1StdDev = thetaSketch.getLowerBound(1); |
| m2StdDev = thetaSketch.getLowerBound(2); |
| m3StdDev = thetaSketch.getLowerBound(3); |
| if (addSummary) { summary = thetaSketch.toString(); } |
| break; |
| |
| case CPC: |
| final CpcSketch cpcSketch = (CpcSketch) sketch; |
| isEstimationMode = true; // no exact mode |
| estimate = cpcSketch.getEstimate(); |
| p1StdDev = cpcSketch.getUpperBound(1); |
| p2StdDev = cpcSketch.getUpperBound(2); |
| p3StdDev = cpcSketch.getUpperBound(3); |
| m1StdDev = cpcSketch.getLowerBound(1); |
| m2StdDev = cpcSketch.getLowerBound(2); |
| m3StdDev = cpcSketch.getLowerBound(3); |
| if (addSummary) { summary = cpcSketch.toString(); } |
| break; |
| |
| case HLL: |
| final HllSketch hllSketch = (HllSketch) sketch; |
| isEstimationMode = hllSketch.isEstimationMode(); |
| estimate = hllSketch.getEstimate(); |
| p1StdDev = hllSketch.getUpperBound(1); |
| p2StdDev = hllSketch.getUpperBound(2); |
| p3StdDev = hllSketch.getUpperBound(3); |
| m1StdDev = hllSketch.getLowerBound(1); |
| m2StdDev = hllSketch.getLowerBound(2); |
| m3StdDev = hllSketch.getLowerBound(3); |
| if (addSummary) { summary = hllSketch.toString(); } |
| break; |
| |
| default: |
| throw new IllegalArgumentException("Unknown distinct counting sketch type: " + type); |
| } |
| |
| //final JsonObject result = new JsonObject(); |
| result.addProperty(RESPONSE_ESTIMATE_FIELD, estimate); |
| result.addProperty(RESPONSE_ESTIMATION_MODE_FIELD, isEstimationMode); |
| result.addProperty(RESPONSE_P1STDEV_FIELD, p1StdDev); |
| result.addProperty(RESPONSE_P2STDEV_FIELD, p2StdDev); |
| result.addProperty(RESPONSE_P3STDEV_FIELD, p3StdDev); |
| result.addProperty(RESPONSE_M1STDEV_FIELD, m1StdDev); |
| result.addProperty(RESPONSE_M2STDEV_FIELD, m2StdDev); |
| result.addProperty(RESPONSE_M3STDEV_FIELD, m3StdDev); |
| if (addSummary) |
| result.addProperty(RESPONSE_SUMMARY_FIELD, summary); |
| |
| return result; |
| } |
| |
| private static JsonObject processQuantilesQuery(final JsonObject result, final JsonObject query, final Family type, final Object sketch) { |
| if (query == null || type == null || sketch == null) { |
| return null; |
| } |
| |
| // check if we need a summary |
| final boolean addSummary = checkSummaryFlag(query); |
| String summary = null; |
| |
| final boolean isEstimationMode; |
| final float maxValue; |
| final float minValue; |
| final long streamLength; |
| |
| final double[] fractions = getFractionsArray(query); |
| float[] quantiles = null; |
| |
| final float[] values = getValuesArray(query); |
| final String resultType = getResultType(query); |
| double[] ranks = null; |
| |
| // since we know REQ is coming |
| switch (type) { |
| case KLL: |
| final KllFloatsSketch kll = (KllFloatsSketch) sketch; |
| isEstimationMode = kll.isEstimationMode(); |
| maxValue = kll.getMaxValue(); |
| minValue = kll.getMinValue(); |
| streamLength = kll.getN(); |
| |
| // TODO: consider valuesPMF vs valuesCDF calls to allow a mix? |
| if (values != null) { |
| ranks = resultType.equals(QUERY_RESULT_TYPE_CDF) ? kll.getCDF(values) : kll.getPMF(values); |
| } |
| |
| if (fractions != null) { |
| quantiles = kll.getQuantiles(fractions); |
| } |
| |
| if (addSummary) |
| summary = kll.toString(); |
| break; |
| |
| default: |
| throw new SketchesException("processQuantilesQuery() received a non-quantiles sketch: " + type); |
| } |
| |
| //final JsonObject result = new JsonObject(); |
| result.addProperty(RESPONSE_STREAM_LENGTH, streamLength); |
| result.addProperty(RESPONSE_ESTIMATION_MODE_FIELD, isEstimationMode); |
| result.addProperty(RESPONSE_MIN_VALUE, minValue); |
| result.addProperty(RESPONSE_MAX_VALUE, maxValue); |
| |
| if (ranks != null) { |
| final String label = resultType.equals(QUERY_RESULT_TYPE_PMF) ? RESPONSE_RESULT_MASS : RESPONSE_RESULT_RANK; |
| final JsonArray rankArray = new JsonArray(); |
| for (int i = 0; i < ranks.length; ++i) { |
| final JsonObject rankPair = new JsonObject(); |
| if (i == values.length) { |
| rankPair.addProperty(RESPONSE_RESULT_VALUE, maxValue); |
| } else { |
| rankPair.addProperty(RESPONSE_RESULT_VALUE, values[i]); |
| } |
| rankPair.addProperty(label, ranks[i]); |
| rankArray.add(rankPair); |
| } |
| |
| if (resultType.equals(QUERY_RESULT_TYPE_CDF)) |
| result.add(RESPONSE_CDF_LIST, rankArray); |
| else |
| result.add(RESPONSE_PMF_LIST, rankArray); |
| } |
| |
| if (quantiles != null) { |
| final JsonArray quantileArray = new JsonArray(); |
| for (int i = 0; i < quantiles.length; ++i) { |
| final JsonObject quantilePair = new JsonObject(); |
| quantilePair.addProperty(RESPONSE_RESULT_RANK, fractions[i]); |
| quantilePair.addProperty(RESPONSE_RESULT_QUANTILE, quantiles[i]); |
| quantileArray.add(quantilePair); |
| } |
| result.add(RESPONSE_QUANTILE_LIST, quantileArray); |
| } |
| |
| if (addSummary) |
| result.addProperty(RESPONSE_SUMMARY_FIELD, summary); |
| |
| return result; |
| } |
| |
| // only one sketch type here so could use ItemsSketch<String> instead of Object, but |
| // we'll eep the signatures generic here |
| @SuppressWarnings("unchecked") |
| private static JsonObject processFrequencyQuery(final JsonObject result, final JsonObject query, final Family type, final Object sketch) { |
| if (query == null || type != Family.FREQUENCY || sketch == null) { |
| return null; |
| } |
| |
| final ItemsSketch<String> sk = (ItemsSketch<String>) sketch; |
| |
| // check if we need a summary |
| final boolean addSummary = checkSummaryFlag(query); |
| |
| if (!query.has(QUERY_ERRORTYPE_FIELD)) { |
| throw new SketchesException("Must specify a value for " + QUERY_ERRORTYPE_FIELD |
| + " for Frequent Items queries"); |
| } |
| |
| final ErrorType errorType; |
| final String errorTypeString = query.get(QUERY_ERRORTYPE_FIELD).getAsString(); |
| if (errorTypeString.equals(QUERY_ERRORTYPE_NO_FP)) { |
| errorType = ErrorType.NO_FALSE_POSITIVES; |
| } else if (errorTypeString.equals(QUERY_ERRORTYPE_NO_FN)) { |
| errorType = ErrorType.NO_FALSE_NEGATIVES; |
| } else { |
| throw new SketchesException("Unknown Frequent Items ErrorType: " + errorTypeString); |
| } |
| |
| final ItemsSketch.Row<String>[] items = sk.getFrequentItems(errorType); |
| |
| final JsonArray itemArray = new JsonArray(); |
| for (final ItemsSketch.Row<String> item : items) { |
| final JsonObject row = new JsonObject(); |
| row.addProperty(RESPONSE_ITEM_VALUE, item.getItem()); |
| row.addProperty(RESPONSE_ITEM_ESTIMATE, item.getEstimate()); |
| row.addProperty(RESPONSE_ITEM_UPPER_BOUND, item.getUpperBound()); |
| row.addProperty(RESPONSE_ITEM_LOWER_BOUND, item.getLowerBound()); |
| itemArray.add(row); |
| } |
| |
| //final JsonObject result = new JsonObject(); |
| result.add(RESPONSE_ITEMS_ARRAY, itemArray); |
| if (addSummary) |
| result.addProperty(RESPONSE_SUMMARY_FIELD, sk.toString()); |
| |
| return result; |
| } |
| |
| @SuppressWarnings("unchecked") |
| private static JsonObject processSamplingQuery(final JsonObject result, final JsonObject query, final Family type, final Object sketch) { |
| if (query == null || type == null || sketch == null) { |
| return null; |
| } |
| |
| // check if we need a summary |
| final boolean addSummary = checkSummaryFlag(query); |
| String summary = null; |
| |
| final long streamWeight; |
| final int k; |
| final JsonArray itemArray = new JsonArray(); |
| |
| switch (type) { |
| case RESERVOIR: |
| final ReservoirItemsSketch<String> ris = (ReservoirItemsSketch<String>) sketch; |
| for (final String item : ris.getSamples()) { |
| itemArray.add(item); |
| } |
| streamWeight = ris.getN(); |
| k = ris.getK(); |
| if (addSummary) |
| summary = ris.toString(); |
| break; |
| |
| case VAROPT: |
| final VarOptItemsSketch<String> vis = (VarOptItemsSketch<String>) sketch; |
| for (final VarOptItemsSamples<String>.WeightedSample ws : vis.getSketchSamples()) { |
| final JsonObject item = new JsonObject(); |
| item.addProperty(RESPONSE_ITEM_VALUE, ws.getItem()); |
| item.addProperty(RESPONSE_ITEM_WEIGHT, ws.getWeight()); |
| itemArray.add(item); |
| } |
| streamWeight = vis.getN(); |
| k = vis.getK(); |
| if (addSummary) |
| summary = vis.toString(); |
| break; |
| |
| default: |
| throw new SketchesException("processSamplingQuery() received a non-sampling sketch: " + type); |
| } |
| |
| //final JsonObject result = new JsonObject(); |
| result.addProperty(RESPONSE_SKETCH_K, k); |
| result.addProperty(RESPONSE_STREAM_WEIGHT, streamWeight); |
| result.add(RESPONSE_ITEMS_ARRAY, itemArray); |
| if (addSummary) |
| result.addProperty(RESPONSE_SUMMARY_FIELD, summary); |
| |
| return result; |
| } |
| |
| // returns an array of rank points, or null if none in query |
| private static float[] getValuesArray(final JsonObject query) { |
| if (query == null || !query.has(QUERY_VALUES_FIELD_NAME)) { |
| return null; |
| } |
| |
| final JsonArray valuesArray = query.get(QUERY_VALUES_FIELD_NAME).getAsJsonArray(); |
| final float[] values = new float[valuesArray.size()]; |
| |
| for (int i = 0; i < values.length; ++i) { |
| if (!valuesArray.get(i).isJsonPrimitive()) { |
| throw new SketchesException("Invalid value in array. Must be a floating point value, found: " + valuesArray.get(i)); |
| } |
| values[i] = valuesArray.get(i).getAsFloat(); |
| } |
| |
| return values; |
| } |
| |
| // returns QUERY_RESULT_TYPE_PMF if specified in QUERY_RESULT_TYPE_NAME_FIELD, otherwise returns default of |
| // QUERY_RESULT_TYPE_CDF |
| private static String getResultType(final JsonObject query) { |
| if (query == null || !query.has(QUERY_RESULT_TYPE_NAME_FIELD)) { |
| return QUERY_RESULT_TYPE_CDF; |
| } |
| |
| final JsonElement resultTypeElement = query.get(QUERY_RESULT_TYPE_NAME_FIELD); |
| if (!resultTypeElement.isJsonPrimitive()) |
| return QUERY_RESULT_TYPE_CDF; |
| |
| final String resultType = resultTypeElement.getAsString().toLowerCase(); |
| if (resultType.equals(QUERY_RESULT_TYPE_PMF)) |
| return QUERY_RESULT_TYPE_PMF; |
| else |
| return QUERY_RESULT_TYPE_CDF; |
| } |
| |
| // returns an array of provided split points, or null if none in query |
| private static double[] getFractionsArray(final JsonObject query) { |
| if (query == null || !query.has(QUERY_FRACTIONS_NAME_FIELD)) { |
| return null; |
| } |
| |
| final JsonArray fractionsArray = query.get(QUERY_FRACTIONS_NAME_FIELD).getAsJsonArray(); |
| final double[] fractions = new double[fractionsArray.size()]; |
| |
| for (int i = 0; i < fractions.length; ++i) { |
| if (!fractionsArray.get(i).isJsonPrimitive()) { |
| throw new SketchesException("Invalid value in array. Must be float in [0.0, 1.0], found: " + fractionsArray.get(i)); |
| } |
| fractions[i] = fractionsArray.get(i).getAsFloat(); |
| } |
| |
| return fractions; |
| } |
| |
| } |