package org.apache.datasketches.characterization.quantiles;
import static java.lang.Math.round;
import static org.apache.datasketches.ExponentiallySpacedPoints.expSpaced;
import static org.apache.datasketches.GaussianRanks.GAUSSIANS_3SD;
import static org.apache.datasketches.Util.evenlySpaced;
import static org.apache.datasketches.Util.pwr2LawNext;
import org.apache.datasketches.Job;
import org.apache.datasketches.JobProfile;
import org.apache.datasketches.MonotonicPoints;
import org.apache.datasketches.Properties;
import org.apache.datasketches.characterization.Shuffle;
import org.apache.datasketches.characterization.quantiles.StreamMaker.Pattern;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.quantiles.DoublesSketch;
import org.apache.datasketches.quantiles.DoublesSketchBuilder;
import org.apache.datasketches.quantiles.UpdateDoublesSketch;
import org.apache.datasketches.req.ReqDebugImpl;
import org.apache.datasketches.req.ReqSketch;
import org.apache.datasketches.req.ReqSketchBuilder;
* @author Lee Rhodes
public class ReqSketchAccuracyProfile implements JobProfile {
private Job job;
private Properties prop;
//Stream pattern config
StreamMaker streamMaker = new StreamMaker();
private Pattern pattern;
private int offset;
//For computing the different stream lengths
private int lgMin;
private int lgMax;
private int lgDelta;
private int ppo; //not currently used
private int numTrials; //num of Trials per plotPoint
private int errQSkLgK; //size of the error quantiles sketches
private int errHllSkLgK; //size of the error HLL sketch
private boolean shuffle; //if true, shuffle for each trial
//plotting & x-axis configuration
private int numPlotPoints;
private boolean evenlySpaced;
private double exponent;
private int sd;
private double rankRange;
//Target sketch configuration & error analysis
private int K;
private boolean hra; //high rank accuracy
private boolean ltEq;
private org.apache.datasketches.req.ReqDebugImpl reqDebugImpl = null;
//DERIVED globals
private ReqSketch sk;
//The array of Gaussian quantiles for +/- StdDev error analysis
private double[] gRanks;
private UpdateDoublesSketch[] errQSkArr;
private HllSketch[] errHllSkArr;
//Specific to a streamLength
private TrueRanks trueRanks;
//The entire stream
private float[] stream; //a shuffled array of values from 1...N
private float[] sortedStream;
private int[] sortedAbsRanks;
//private int[] streamAbsRanks ?? do we need?
//The PP points
private float[] sortedPPValues;
private int[] sortedPPIndices;
private int[] sortedPPAbsRanks;
int sumAllocCounts = 0;
private final String[] columnLabels =
{"nPP", "Value", "Rank",
"-3SD","-2SD", "-1SD", "Med", "+1SD", "+2SD", "+3SD",
"1LB", "1UB", "UErrCnt"};
private final String sFmt =
+ "%4s\t%4s\t%4s\t%5s\t%4s\t%4s\t%4s\t"
+ "%3s\t%3s\t%7s\n";
private final String fFmt =
"%14.10f\t%14.0f\t%14.10f\t" //rPP, Value, Rank
+ "%14.10f\t%14.10f\t%14.10f\t%14.10f\t%14.10f\t%14.10f\t%14.10f\t" //-3sd to +3sd
+ "%14.10f\t%14.10f\t%6d\n"; //1lb, 1ub, UErrCnt
//JobProfile interface
public void start(final Job job) {
this.job = job;
prop = job.getProperties();
public void shutdown() {}
public void cleanup() {}
//end JobProfile
private void extractProperties() {
//Stream Pattern
pattern = Pattern.valueOf(prop.mustGet("Pattern"));
offset = Integer.parseInt(prop.mustGet("Offset"));
//Stream lengths
lgMin = Integer.parseInt(prop.mustGet("LgMin"));
lgMax = Integer.parseInt(prop.mustGet("LgMax"));
lgDelta = Integer.parseInt(prop.mustGet("LgDelta"));
ppo = Integer.parseInt(prop.mustGet("PPO"));
// Trials config (indep of sketch)
numTrials = 1 << Integer.parseInt(prop.mustGet("LgTrials"));
errQSkLgK = Integer.parseInt(prop.mustGet("ErrQSkLgK"));
errHllSkLgK = Integer.parseInt(prop.mustGet("ErrHllSkLgK"));
shuffle = Boolean.valueOf(prop.mustGet("Shuffle"));
numPlotPoints = Integer.parseInt(prop.mustGet("NumPlotPoints"));
evenlySpaced = Boolean.valueOf(prop.mustGet("EvenlySpaced"));
exponent = Double.parseDouble(prop.mustGet("Exponent"));
sd = Integer.parseInt(prop.mustGet("StdDev"));
rankRange = Double.parseDouble(prop.mustGet("RankRange"));
//Target sketch config
K = Integer.parseInt(prop.mustGet("K"));
hra = Boolean.parseBoolean(prop.mustGet("HRA"));
ltEq = Boolean.parseBoolean(prop.mustGet("LtEq"));
//criterion = InequalitySearch.valueOf(prop.mustGet("Criterion"));
String reqDebugLevel = prop.get("ReqDebugLevel");
String reqDebugFmt = prop.get("ReqDebugFmt");
if (reqDebugLevel != null) {
int level = Integer.parseInt(reqDebugLevel);
reqDebugImpl = new ReqDebugImpl(level, reqDebugFmt);
void configureCommon() {
errQSkArr = new UpdateDoublesSketch[numPlotPoints];
errHllSkArr = new HllSketch[numPlotPoints];
//configure the error quantiles array & HLL sketch arr
final DoublesSketchBuilder builder = DoublesSketch.builder().setK(1 << errQSkLgK);
for (int i = 0; i < numPlotPoints; i++) {
errQSkArr[i] =;
errHllSkArr[i] = new HllSketch(errHllSkLgK);
gRanks = new double[GAUSSIANS_3SD.length - 2]; //omit 0.0 and 1.0
for (int i = 1; i < GAUSSIANS_3SD.length - 1; i++) {
gRanks[i - 1] = GAUSSIANS_3SD[i];
void configureSketch() {
final ReqSketchBuilder bldr = ReqSketch.builder();
if (reqDebugImpl != null) { bldr.setReqDebug(reqDebugImpl); }
sk =;
private void doStreamLengths() {
//compute the number of stream lengths for the whole job
final int numSteps;
final boolean useppo;
if (lgDelta < 1) {
numSteps = MonotonicPoints.countPoints(lgMin, lgMax, ppo);
useppo = true;
} else {
numSteps = (lgMax - lgMin) / lgDelta + 1;
useppo = false;
int streamLength = 1 << lgMin; //initial streamLength
int lgCurSL = lgMin;
// Step through the different stream lengths
for (int step = 0; step < numSteps; step++) {
//go to next stream length
if (useppo) {
streamLength = pwr2LawNext(ppo, streamLength);
} else {
lgCurSL += lgDelta;
streamLength = 1 << lgCurSL;
void doStreamLength(final int streamLength) {
job.println(LS + "Stream Length: " + streamLength );
job.printfData(sFmt, (Object[])columnLabels);
//build the stream
stream = streamMaker.makeStream(streamLength, pattern, offset);
//compute true ranks
if (ltEq) {
trueRanks = new TrueRanks(stream, true);
} else {
trueRanks = new TrueRanks(stream, false);
sortedStream = trueRanks.getSortedStream();
sortedAbsRanks = trueRanks.getSortedAbsRanks();
//compute the true values used at the plot points
int startIdx = 0;
int endIdx = streamLength - 1;
if (rankRange < 1.0) { //A substream of points focuses on a sub-range at one end.
final int subStreamLen = (int)Math.round(rankRange * streamLength);
startIdx = hra ? streamLength - subStreamLen : 0;
endIdx = hra ? streamLength - 1 : subStreamLen - 1;
//generates PP indices in [startIdx, endIdx] inclusive, inclusive // PV 2020-01-07: using double so that there's enough precision even for large stream lengths
final double[] temp = evenlySpaced
? evenlySpaced(startIdx, endIdx, numPlotPoints)
: expSpaced(startIdx, endIdx, numPlotPoints, exponent, hra);
sortedPPIndices = new int[numPlotPoints];
sortedPPAbsRanks = new int[numPlotPoints];
sortedPPValues = new float[numPlotPoints];
for (int pp = 0; pp < numPlotPoints; pp++) {
final int idx = (int)Math.round(temp[pp]);
sortedPPIndices[pp] = idx;
sortedPPAbsRanks[pp] = sortedAbsRanks[idx];
sortedPPValues[pp] = sortedStream[idx];
//Do numTrials for all plotpoints
for (int t = 0; t < numTrials; t++) {
//sumAllocCounts = sk.
//at this point each of the errQSkArr sketches has a distribution of error from numTrials
for (int pp = 0 ; pp < numPlotPoints; pp++) {
final double v = sortedPPValues[pp];
final double tr = v / streamLength; //the true rank
final double rlb = sk.getRankLowerBound(tr, sd) - tr;
final double rub = sk.getRankUpperBound(tr, sd) - tr;
//for each of the numErrDistRanks distributions extract the sd Gaussian quantiles
final double[] errQ = errQSkArr[pp].getQuantiles(gRanks);
final int uErrCnt = (int)round(errHllSkArr[pp].getEstimate());
//Plot the row.
final double relPP = (double)(pp + 1) / numPlotPoints;
job.printfData(fFmt, relPP, v, tr,
errQ[0], errQ[1], errQ[2], errQ[3], errQ[4], errQ[5], errQ[6],
rlb, rub, uErrCnt);
errQSkArr[pp].reset(); //reset the errQSkArr for next streamLength
errHllSkArr[pp].reset(); //reset the errHllSkArr for next streamLength
job.println(LS + "Serialization Bytes: " + sk.getSerializationBytes());
job.println(sk.viewCompactorDetail("%5.0f", false));
* A trial consists of updating a virgin sketch with a stream of values.
* Capture the estimated ranks for all plotPoints and then update the errQSkArr with those
* error values.
void doTrial() {
if (shuffle) { Shuffle.shuffle(stream); }
final int sl = stream.length;
for (int i = 0; i < sl; i++) { sk.update(stream[i]); }
//get estimated ranks from sketch for all plotpoints
final double[] estRanks = sk.getRanks(sortedPPValues);
//compute errors and update HLL for each plotPoint
for (int pp = 0; pp < numPlotPoints; pp++) {
final double errorAtPlotPoint = estRanks[pp] - (double)sortedPPAbsRanks[pp] / sl;
errQSkArr[pp].update(errorAtPlotPoint); //update each of the errQArr sketches
errHllSkArr[pp].update(errorAtPlotPoint); //unique count of error values