Merge pull request #533 from romseygeek/sketch-bytebuffers-update
Allow updating sketches with ByteBuffers
diff --git a/src/main/java/org/apache/datasketches/hllmap/UniqueCountMap.java b/src/main/java/org/apache/datasketches/hllmap/UniqueCountMap.java
index 9a480d9..431920d 100644
--- a/src/main/java/org/apache/datasketches/hllmap/UniqueCountMap.java
+++ b/src/main/java/org/apache/datasketches/hllmap/UniqueCountMap.java
@@ -36,6 +36,7 @@
* <p>The space consumed by this map is quite sensitive to the actual distribution of identifiers
* per key, so you should characterize and or experiment with your typical input streams.
* Nonetheless, our experiments on live streams of over 100M keys required about 1.4GB of space.
+ * This is about 14 bytes per key for key storage and unique count storage.
*
* <p>Given such highly-skewed distributions, using this map is far more efficient space-wise than
* the alternative of dedicating an HLL sketch per key. Based on our use cases, after
diff --git a/src/main/java/org/apache/datasketches/kll/KllDoublesSketch.java b/src/main/java/org/apache/datasketches/kll/KllDoublesSketch.java
index 8883734..ab241f7 100644
--- a/src/main/java/org/apache/datasketches/kll/KllDoublesSketch.java
+++ b/src/main/java/org/apache/datasketches/kll/KllDoublesSketch.java
@@ -25,6 +25,7 @@
import static org.apache.datasketches.kll.KllSketch.SketchStructure.UPDATABLE;
import static org.apache.datasketches.kll.KllSketch.SketchType.DOUBLES_SKETCH;
+import java.util.Arrays;
import java.util.Objects;
import org.apache.datasketches.common.ArrayOfItemsSerDe;
@@ -35,7 +36,7 @@
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.MemoryRequestServer;
import org.apache.datasketches.memory.WritableMemory;
-import org.apache.datasketches.quantilescommon.DoublesSortedView;
+import org.apache.datasketches.quantilescommon.DoublesSketchSortedView;
import org.apache.datasketches.quantilescommon.QuantileSearchCriteria;
import org.apache.datasketches.quantilescommon.QuantilesDoublesAPI;
import org.apache.datasketches.quantilescommon.QuantilesDoublesSketchIterator;
@@ -46,7 +47,7 @@
* @see org.apache.datasketches.kll.KllSketch
*/
public abstract class KllDoublesSketch extends KllSketch implements QuantilesDoublesAPI {
- private KllDoublesSketchSortedView kllDoublesSV = null;
+ private DoublesSketchSortedView doublesSV = null;
final static int ITEM_BYTES = Double.BYTES;
KllDoublesSketch(
@@ -171,21 +172,21 @@
public double[] getCDF(final double[] splitPoints, final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
refreshSortedView();
- return kllDoublesSV.getCDF(splitPoints, searchCrit);
+ return doublesSV.getCDF(splitPoints, searchCrit);
}
@Override
public double[] getPMF(final double[] splitPoints, final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
refreshSortedView();
- return kllDoublesSV.getPMF(splitPoints, searchCrit);
+ return doublesSV.getPMF(splitPoints, searchCrit);
}
@Override
public double getQuantile(final double rank, final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
refreshSortedView();
- return kllDoublesSV.getQuantile(rank, searchCrit);
+ return doublesSV.getQuantile(rank, searchCrit);
}
@Override
@@ -195,7 +196,7 @@
final int len = ranks.length;
final double[] quantiles = new double[len];
for (int i = 0; i < len; i++) {
- quantiles[i] = kllDoublesSV.getQuantile(ranks[i], searchCrit);
+ quantiles[i] = doublesSV.getQuantile(ranks[i], searchCrit);
}
return quantiles;
}
@@ -224,7 +225,7 @@
public double getRank(final double quantile, final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
refreshSortedView();
- return kllDoublesSV.getRank(quantile, searchCrit);
+ return doublesSV.getRank(quantile, searchCrit);
}
/**
@@ -254,17 +255,17 @@
final int len = quantiles.length;
final double[] ranks = new double[len];
for (int i = 0; i < len; i++) {
- ranks[i] = kllDoublesSV.getRank(quantiles[i], searchCrit);
+ ranks[i] = doublesSV.getRank(quantiles[i], searchCrit);
}
return ranks;
}
@Override
@SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "OK in this case.")
- public DoublesSortedView getSortedView() {
+ public DoublesSketchSortedView getSortedView() {
if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
refreshSortedView();
- return kllDoublesSV;
+ return doublesSV;
}
@Override
@@ -278,9 +279,9 @@
if (readOnly || sketchStructure != UPDATABLE) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
if (this == other) { throw new SketchesArgumentException(SELF_MERGE_MSG); }
final KllDoublesSketch othDblSk = (KllDoublesSketch)other;
- if (othDblSk.isEmpty()) { return; } //then check empty
+ if (othDblSk.isEmpty()) { return; }
KllDoublesHelper.mergeDoubleImpl(this, othDblSk);
- kllDoublesSV = null;
+ doublesSV = null;
}
/**
@@ -299,7 +300,7 @@
setMinItem(Double.NaN);
setMaxItem(Double.NaN);
setDoubleItemsArray(new double[k]);
- kllDoublesSV = null;
+ doublesSV = null;
}
@Override
@@ -323,7 +324,7 @@
if (Double.isNaN(item)) { return; } //ignore
if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
KllDoublesHelper.updateDouble(this, item);
- kllDoublesSV = null;
+ doublesSV = null;
}
/**
@@ -337,7 +338,7 @@
if (weight < 1L) { throw new SketchesArgumentException("Weight is less than one."); }
if (weight == 1L) { KllDoublesHelper.updateDouble(this, item); }
else { KllDoublesHelper.updateDouble(this, item, weight); }
- kllDoublesSV = null;
+ doublesSV = null;
}
//restricted
@@ -393,11 +394,6 @@
return levelsArr[getNumLevels()] * Double.BYTES;
}
- private final void refreshSortedView() {
- kllDoublesSV = (kllDoublesSV == null)
- ? new KllDoublesSketchSortedView(this) : kllDoublesSV;
- }
-
abstract void setDoubleItemsArray(double[] doubleItems);
abstract void setDoubleItemsArrayAt(int index, double item);
@@ -416,4 +412,135 @@
}
}
+ private final DoublesSketchSortedView refreshSortedView() {
+ if (doublesSV == null) {
+ final CreateSortedView csv = new CreateSortedView();
+ doublesSV = csv.getSV();
+ }
+ return doublesSV;
+ }
+
+ private final class CreateSortedView {
+ double[] quantiles;
+ long[] cumWeights;
+
+ DoublesSketchSortedView getSV() {
+ if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
+ if (getN() == 0) { throw new SketchesArgumentException(EMPTY_MSG); }
+ final double[] srcQuantiles = getDoubleItemsArray();
+ final int[] srcLevels = levelsArr;
+ final int srcNumLevels = getNumLevels();
+
+ if (!isLevelZeroSorted()) {
+ Arrays.sort(srcQuantiles, srcLevels[0], srcLevels[1]);
+ if (!hasMemory()) { setLevelZeroSorted(true); }
+ }
+ final int numQuantiles = getNumRetained();
+ quantiles = new double[numQuantiles];
+ cumWeights = new long[numQuantiles];
+ populateFromSketch(srcQuantiles, srcLevels, srcNumLevels, numQuantiles);
+ return new DoublesSketchSortedView(
+ quantiles, cumWeights, getN(), getMaxItem(), getMinItem());
+ }
+
+ private void populateFromSketch(final double[] srcQuantiles, final int[] srcLevels,
+ final int srcNumLevels, final int numItems) {
+ final int[] myLevels = new int[srcNumLevels + 1];
+ final int offset = srcLevels[0];
+ System.arraycopy(srcQuantiles, offset, quantiles, 0, numItems);
+ int srcLevel = 0;
+ int dstLevel = 0;
+ long weight = 1;
+ while (srcLevel < srcNumLevels) {
+ final int fromIndex = srcLevels[srcLevel] - offset;
+ final int toIndex = srcLevels[srcLevel + 1] - offset; // exclusive
+ if (fromIndex < toIndex) { // if equal, skip empty level
+ Arrays.fill(cumWeights, fromIndex, toIndex, weight);
+ myLevels[dstLevel] = fromIndex;
+ myLevels[dstLevel + 1] = toIndex;
+ dstLevel++;
+ }
+ srcLevel++;
+ weight *= 2;
+ }
+ final int numLevels = dstLevel;
+ blockyTandemMergeSort(quantiles, cumWeights, myLevels, numLevels); //create unit weights
+ KllHelper.convertToCumulative(cumWeights);
+ }
+ } //End of class CreateSortedView
+
+ private static void blockyTandemMergeSort(final double[] quantiles, final long[] weights,
+ final int[] levels, final int numLevels) {
+ if (numLevels == 1) { return; }
+
+ // duplicate the input in preparation for the "ping-pong" copy reduction strategy.
+ final double[] quantilesTmp = Arrays.copyOf(quantiles, quantiles.length);
+ final long[] weightsTmp = Arrays.copyOf(weights, quantiles.length); // don't need the extra one
+
+ blockyTandemMergeSortRecursion(quantilesTmp, weightsTmp, quantiles, weights, levels, 0, numLevels);
+ }
+
+ private static void blockyTandemMergeSortRecursion(
+ final double[] quantilesSrc, final long[] weightsSrc,
+ final double[] quantilesDst, final long[] weightsDst,
+ final int[] levels, final int startingLevel, final int numLevels) {
+ if (numLevels == 1) { return; }
+ final int numLevels1 = numLevels / 2;
+ final int numLevels2 = numLevels - numLevels1;
+ assert numLevels1 >= 1;
+ assert numLevels2 >= numLevels1;
+ final int startingLevel1 = startingLevel;
+ final int startingLevel2 = startingLevel + numLevels1;
+ // swap roles of src and dst
+ blockyTandemMergeSortRecursion(
+ quantilesDst, weightsDst,
+ quantilesSrc, weightsSrc,
+ levels, startingLevel1, numLevels1);
+ blockyTandemMergeSortRecursion(
+ quantilesDst, weightsDst,
+ quantilesSrc, weightsSrc,
+ levels, startingLevel2, numLevels2);
+ tandemMerge(
+ quantilesSrc, weightsSrc,
+ quantilesDst, weightsDst,
+ levels,
+ startingLevel1, numLevels1,
+ startingLevel2, numLevels2);
+ }
+
+ private static void tandemMerge(
+ final double[] quantilesSrc, final long[] weightsSrc,
+ final double[] quantilesDst, final long[] weightsDst,
+ final int[] levelStarts,
+ final int startingLevel1, final int numLevels1,
+ final int startingLevel2, final int numLevels2) {
+ final int fromIndex1 = levelStarts[startingLevel1];
+ final int toIndex1 = levelStarts[startingLevel1 + numLevels1]; // exclusive
+ final int fromIndex2 = levelStarts[startingLevel2];
+ final int toIndex2 = levelStarts[startingLevel2 + numLevels2]; // exclusive
+ int iSrc1 = fromIndex1;
+ int iSrc2 = fromIndex2;
+ int iDst = fromIndex1;
+
+ while (iSrc1 < toIndex1 && iSrc2 < toIndex2) {
+ if (quantilesSrc[iSrc1] < quantilesSrc[iSrc2]) {
+ quantilesDst[iDst] = quantilesSrc[iSrc1];
+ weightsDst[iDst] = weightsSrc[iSrc1];
+ iSrc1++;
+ } else {
+ quantilesDst[iDst] = quantilesSrc[iSrc2];
+ weightsDst[iDst] = weightsSrc[iSrc2];
+ iSrc2++;
+ }
+ iDst++;
+ }
+ if (iSrc1 < toIndex1) {
+ System.arraycopy(quantilesSrc, iSrc1, quantilesDst, iDst, toIndex1 - iSrc1);
+ System.arraycopy(weightsSrc, iSrc1, weightsDst, iDst, toIndex1 - iSrc1);
+ } else if (iSrc2 < toIndex2) {
+ System.arraycopy(quantilesSrc, iSrc2, quantilesDst, iDst, toIndex2 - iSrc2);
+ System.arraycopy(weightsSrc, iSrc2, weightsDst, iDst, toIndex2 - iSrc2);
+ }
+ }
+
}
diff --git a/src/main/java/org/apache/datasketches/kll/KllDoublesSketchSortedView.java b/src/main/java/org/apache/datasketches/kll/KllDoublesSketchSortedView.java
deleted file mode 100644
index 13f0a9d..0000000
--- a/src/main/java/org/apache/datasketches/kll/KllDoublesSketchSortedView.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.datasketches.kll;
-
-import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.INCLUSIVE;
-import static org.apache.datasketches.quantilescommon.QuantilesAPI.EMPTY_MSG;
-import static org.apache.datasketches.quantilescommon.QuantilesUtil.getNaturalRank;
-
-import java.util.Arrays;
-
-import org.apache.datasketches.common.SketchesArgumentException;
-import org.apache.datasketches.quantilescommon.DoublesSortedView;
-import org.apache.datasketches.quantilescommon.DoublesSortedViewIterator;
-import org.apache.datasketches.quantilescommon.InequalitySearch;
-import org.apache.datasketches.quantilescommon.QuantileSearchCriteria;
-import org.apache.datasketches.quantilescommon.QuantilesUtil;
-
-/**
- * The SortedView of the KllDoublesSketch.
- * @author Alexander Saydakov
- * @author Lee Rhodes
- */
-public final class KllDoublesSketchSortedView implements DoublesSortedView {
- private final double[] quantiles;
- private final long[] cumWeights; //comes in as individual weights, converted to cumulative natural weights
- private final long totalN;
- private final double maxItem;
- private final double minItem;
-
- /**
- * Construct from elements for testing.
- * @param quantiles sorted array of quantiles
- * @param cumWeights sorted, monotonically increasing cumulative weights.
- * @param totalN the total number of items presented to the sketch.
- */
- KllDoublesSketchSortedView(final double[] quantiles, final long[] cumWeights, final long totalN,
- final double maxItem, final double minItem) {
- this.quantiles = quantiles;
- this.cumWeights = cumWeights;
- this.totalN = totalN;
- this.maxItem = maxItem;
- this.minItem = minItem;
- }
-
- /**
- * Constructs this Sorted View given the sketch
- * @param sketch the given KllDoublesSketch.
- */
- public KllDoublesSketchSortedView(final KllDoublesSketch sketch) {
- if (sketch.isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
- this.totalN = sketch.getN();
- this.maxItem = sketch.getMaxItem();
- this.minItem = sketch.getMinItem();
- final double[] srcQuantiles = sketch.getDoubleItemsArray();
- final int[] srcLevels = sketch.levelsArr;
- final int srcNumLevels = sketch.getNumLevels();
-
- if (!sketch.isLevelZeroSorted()) {
- Arrays.sort(srcQuantiles, srcLevels[0], srcLevels[1]);
- if (!sketch.hasMemory()) { sketch.setLevelZeroSorted(true); }
- }
-
- final int numQuantiles = srcLevels[srcNumLevels] - srcLevels[0]; //remove free space
- quantiles = new double[numQuantiles];
- cumWeights = new long[numQuantiles];
- populateFromSketch(srcQuantiles, srcLevels, srcNumLevels, numQuantiles);
- }
-
- @Override
- public long[] getCumulativeWeights() {
- return cumWeights.clone();
- }
-
- @Override
- public double getMaxItem() {
- return maxItem;
- }
-
- @Override
- public double getMinItem() {
- return minItem;
- }
-
- @Override
- public long getN() {
- return totalN;
- }
-
- @Override
- public double getQuantile(final double rank, final QuantileSearchCriteria searchCrit) {
- if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
- QuantilesUtil.checkNormalizedRankBounds(rank);
- final int len = cumWeights.length;
- final double naturalRank = getNaturalRank(rank, totalN, searchCrit);
- final InequalitySearch crit = (searchCrit == INCLUSIVE) ? InequalitySearch.GE : InequalitySearch.GT;
- final int index = InequalitySearch.find(cumWeights, 0, len - 1, naturalRank, crit);
- if (index == -1) {
- return quantiles[len - 1]; //EXCLUSIVE (GT) case: normRank == 1.0;
- }
- return quantiles[index];
- }
-
- @Override
- public double[] getQuantiles() {
- return quantiles.clone();
- }
-
- @Override
- public double getRank(final double quantile, final QuantileSearchCriteria searchCrit) {
- if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
- final int len = quantiles.length;
- final InequalitySearch crit = (searchCrit == INCLUSIVE) ? InequalitySearch.LE : InequalitySearch.LT;
- final int index = InequalitySearch.find(quantiles, 0, len - 1, quantile, crit);
- if (index == -1) {
- return 0; //EXCLUSIVE (LT) case: quantile <= minQuantile; INCLUSIVE (LE) case: quantile < minQuantile
- }
- return (double)cumWeights[index] / totalN;
- }
-
- @Override
- public boolean isEmpty() {
- return totalN == 0;
- }
-
- @Override
- public DoublesSortedViewIterator iterator() {
- return new DoublesSortedViewIterator(quantiles, cumWeights);
- }
-
- //restricted methods
-
- private void populateFromSketch(final double[] srcQuantiles, final int[] srcLevels,
- final int srcNumLevels, final int numItems) {
- final int[] myLevels = new int[srcNumLevels + 1];
- final int offset = srcLevels[0];
- System.arraycopy(srcQuantiles, offset, quantiles, 0, numItems);
- int srcLevel = 0;
- int dstLevel = 0;
- long weight = 1;
- while (srcLevel < srcNumLevels) {
- final int fromIndex = srcLevels[srcLevel] - offset;
- final int toIndex = srcLevels[srcLevel + 1] - offset; // exclusive
- if (fromIndex < toIndex) { // if equal, skip empty level
- Arrays.fill(cumWeights, fromIndex, toIndex, weight);
- myLevels[dstLevel] = fromIndex;
- myLevels[dstLevel + 1] = toIndex;
- dstLevel++;
- }
- srcLevel++;
- weight *= 2;
- }
- final int numLevels = dstLevel;
- blockyTandemMergeSort(quantiles, cumWeights, myLevels, numLevels); //create unit weights
- KllHelper.convertToCumulative(cumWeights);
- }
-
- private static void blockyTandemMergeSort(final double[] quantiles, final long[] weights,
- final int[] levels, final int numLevels) {
- if (numLevels == 1) { return; }
-
- // duplicate the input in preparation for the "ping-pong" copy reduction strategy.
- final double[] quantilesTmp = Arrays.copyOf(quantiles, quantiles.length);
- final long[] weightsTmp = Arrays.copyOf(weights, quantiles.length); // don't need the extra one
-
- blockyTandemMergeSortRecursion(quantilesTmp, weightsTmp, quantiles, weights, levels, 0, numLevels);
- }
-
- private static void blockyTandemMergeSortRecursion(
- final double[] quantilesSrc, final long[] weightsSrc,
- final double[] quantilesDst, final long[] weightsDst,
- final int[] levels, final int startingLevel, final int numLevels) {
- if (numLevels == 1) { return; }
- final int numLevels1 = numLevels / 2;
- final int numLevels2 = numLevels - numLevels1;
- assert numLevels1 >= 1;
- assert numLevels2 >= numLevels1;
- final int startingLevel1 = startingLevel;
- final int startingLevel2 = startingLevel + numLevels1;
- // swap roles of src and dst
- blockyTandemMergeSortRecursion(
- quantilesDst, weightsDst,
- quantilesSrc, weightsSrc,
- levels, startingLevel1, numLevels1);
- blockyTandemMergeSortRecursion(
- quantilesDst, weightsDst,
- quantilesSrc, weightsSrc,
- levels, startingLevel2, numLevels2);
- tandemMerge(
- quantilesSrc, weightsSrc,
- quantilesDst, weightsDst,
- levels,
- startingLevel1, numLevels1,
- startingLevel2, numLevels2);
- }
-
- private static void tandemMerge(
- final double[] quantilesSrc, final long[] weightsSrc,
- final double[] quantilesDst, final long[] weightsDst,
- final int[] levelStarts,
- final int startingLevel1, final int numLevels1,
- final int startingLevel2, final int numLevels2) {
- final int fromIndex1 = levelStarts[startingLevel1];
- final int toIndex1 = levelStarts[startingLevel1 + numLevels1]; // exclusive
- final int fromIndex2 = levelStarts[startingLevel2];
- final int toIndex2 = levelStarts[startingLevel2 + numLevels2]; // exclusive
- int iSrc1 = fromIndex1;
- int iSrc2 = fromIndex2;
- int iDst = fromIndex1;
-
- while (iSrc1 < toIndex1 && iSrc2 < toIndex2) {
- if (quantilesSrc[iSrc1] < quantilesSrc[iSrc2]) {
- quantilesDst[iDst] = quantilesSrc[iSrc1];
- weightsDst[iDst] = weightsSrc[iSrc1];
- iSrc1++;
- } else {
- quantilesDst[iDst] = quantilesSrc[iSrc2];
- weightsDst[iDst] = weightsSrc[iSrc2];
- iSrc2++;
- }
- iDst++;
- }
- if (iSrc1 < toIndex1) {
- System.arraycopy(quantilesSrc, iSrc1, quantilesDst, iDst, toIndex1 - iSrc1);
- System.arraycopy(weightsSrc, iSrc1, weightsDst, iDst, toIndex1 - iSrc1);
- } else if (iSrc2 < toIndex2) {
- System.arraycopy(quantilesSrc, iSrc2, quantilesDst, iDst, toIndex2 - iSrc2);
- System.arraycopy(weightsSrc, iSrc2, weightsDst, iDst, toIndex2 - iSrc2);
- }
- }
-
-}
diff --git a/src/main/java/org/apache/datasketches/kll/KllItemsSketch.java b/src/main/java/org/apache/datasketches/kll/KllItemsSketch.java
index 273a89b..9f5a5ae 100644
--- a/src/main/java/org/apache/datasketches/kll/KllItemsSketch.java
+++ b/src/main/java/org/apache/datasketches/kll/KllItemsSketch.java
@@ -242,7 +242,6 @@
public final ItemsSketchSortedView<T> getSortedView() {
if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
return refreshSortedView();
- //return itemsSV; //SpotBugs EI_EXPOSE_REP, Suppressed by FindBugsExcludeFilter
}
@Override
@@ -454,7 +453,7 @@
blockyTandemMergeSort(quantiles, cumWeights, myLevels, numLevels, comparator); //create unit weights
KllHelper.convertToCumulative(cumWeights);
}
- }
+ } //End of class CreateSortedView
private static <T> void blockyTandemMergeSort(final Object[] quantiles, final long[] weights,
final int[] levels, final int numLevels, final Comparator<? super T> comp) {
diff --git a/src/main/java/org/apache/datasketches/quantiles/CompactDoublesSketch.java b/src/main/java/org/apache/datasketches/quantiles/CompactDoublesSketch.java
index 81c9e6d..f6df9e8 100644
--- a/src/main/java/org/apache/datasketches/quantiles/CompactDoublesSketch.java
+++ b/src/main/java/org/apache/datasketches/quantiles/CompactDoublesSketch.java
@@ -23,7 +23,7 @@
import org.apache.datasketches.memory.Memory;
/**
- * Compact sketches are inherently <i>read ony</i>.
+ * Compact sketches are inherently <i>read only</i>.
* @author Jon Malkin
*/
public abstract class CompactDoublesSketch extends DoublesSketch {
diff --git a/src/main/java/org/apache/datasketches/quantiles/DirectDoublesSketchAccessor.java b/src/main/java/org/apache/datasketches/quantiles/DirectDoublesSketchAccessor.java
index 172ad14..b95c9f1 100644
--- a/src/main/java/org/apache/datasketches/quantiles/DirectDoublesSketchAccessor.java
+++ b/src/main/java/org/apache/datasketches/quantiles/DirectDoublesSketchAccessor.java
@@ -27,6 +27,7 @@
* @author Jon Malkin
*/
final class DirectDoublesSketchAccessor extends DoublesSketchAccessor {
+
DirectDoublesSketchAccessor(final DoublesSketch ds,
final boolean forceSize,
final int level) {
diff --git a/src/main/java/org/apache/datasketches/quantiles/DirectUpdateDoublesSketch.java b/src/main/java/org/apache/datasketches/quantiles/DirectUpdateDoublesSketch.java
index 384b426..96c01d9 100644
--- a/src/main/java/org/apache/datasketches/quantiles/DirectUpdateDoublesSketch.java
+++ b/src/main/java/org/apache/datasketches/quantiles/DirectUpdateDoublesSketch.java
@@ -193,7 +193,7 @@
//bit pattern on direct is always derived, no need to save it.
}
putN(newN);
- classicQdsSV = null;
+ doublesSV = null;
}
@Override
diff --git a/src/main/java/org/apache/datasketches/quantiles/DoublesArrayAccessor.java b/src/main/java/org/apache/datasketches/quantiles/DoublesArrayAccessor.java
index dd740fb..7fcf38a 100644
--- a/src/main/java/org/apache/datasketches/quantiles/DoublesArrayAccessor.java
+++ b/src/main/java/org/apache/datasketches/quantiles/DoublesArrayAccessor.java
@@ -67,8 +67,7 @@
}
@Override
- void putArray(final double[] srcArray, final int srcIndex,
- final int dstIndex, final int numItems) {
+ void putArray(final double[] srcArray, final int srcIndex, final int dstIndex, final int numItems) {
System.arraycopy(srcArray, srcIndex, buffer_, dstIndex, numItems);
}
diff --git a/src/main/java/org/apache/datasketches/quantiles/DoublesBufferAccessor.java b/src/main/java/org/apache/datasketches/quantiles/DoublesBufferAccessor.java
index c3b0f9b..1014a10 100644
--- a/src/main/java/org/apache/datasketches/quantiles/DoublesBufferAccessor.java
+++ b/src/main/java/org/apache/datasketches/quantiles/DoublesBufferAccessor.java
@@ -23,6 +23,7 @@
* @author Jon Malkin
*/
abstract class DoublesBufferAccessor {
+
abstract double get(final int index);
abstract double set(final int index, final double quantile);
@@ -31,6 +32,5 @@
abstract double[] getArray(int fromIdx, int numItems);
- abstract void putArray(double[] srcArray, int srcIndex,
- int dstIndex, int numItems);
+ abstract void putArray(double[] srcArray, int srcIndex, int dstIndex, int numItems);
}
diff --git a/src/main/java/org/apache/datasketches/quantiles/DoublesSketch.java b/src/main/java/org/apache/datasketches/quantiles/DoublesSketch.java
index 93858c3..fd0698b 100644
--- a/src/main/java/org/apache/datasketches/quantiles/DoublesSketch.java
+++ b/src/main/java/org/apache/datasketches/quantiles/DoublesSketch.java
@@ -21,6 +21,7 @@
import static java.lang.Math.max;
import static java.lang.Math.min;
+import static java.lang.System.arraycopy;
import static org.apache.datasketches.common.Util.ceilingPowerOf2;
import static org.apache.datasketches.quantiles.ClassicUtil.MAX_PRELONGS;
import static org.apache.datasketches.quantiles.ClassicUtil.MIN_K;
@@ -28,13 +29,16 @@
import static org.apache.datasketches.quantiles.ClassicUtil.checkK;
import static org.apache.datasketches.quantiles.ClassicUtil.computeNumLevelsNeeded;
import static org.apache.datasketches.quantiles.ClassicUtil.computeRetainedItems;
+import static org.apache.datasketches.quantiles.DoublesSketchAccessor.BB_LVL_IDX;
+import java.util.Arrays;
import java.util.Random;
import org.apache.datasketches.common.SketchesArgumentException;
+import org.apache.datasketches.common.SketchesStateException;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
-import org.apache.datasketches.quantilescommon.DoublesSortedView;
+import org.apache.datasketches.quantilescommon.DoublesSketchSortedView;
import org.apache.datasketches.quantilescommon.QuantileSearchCriteria;
import org.apache.datasketches.quantilescommon.QuantilesAPI;
import org.apache.datasketches.quantilescommon.QuantilesDoublesAPI;
@@ -108,7 +112,7 @@
*/
final int k_;
- DoublesSketchSortedView classicQdsSV = null;
+ DoublesSketchSortedView doublesSV = null;
DoublesSketch(final int k) {
checkK(k);
@@ -160,7 +164,7 @@
public double[] getCDF(final double[] splitPoints, final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
refreshSortedView();
- return classicQdsSV.getCDF(splitPoints, searchCrit);
+ return doublesSV.getCDF(splitPoints, searchCrit);
}
@Override
@@ -173,14 +177,14 @@
public double[] getPMF(final double[] splitPoints, final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
refreshSortedView();
- return classicQdsSV.getPMF(splitPoints, searchCrit);
+ return doublesSV.getPMF(splitPoints, searchCrit);
}
@Override
public double getQuantile(final double rank, final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
refreshSortedView();
- return classicQdsSV.getQuantile(rank, searchCrit);
+ return doublesSV.getQuantile(rank, searchCrit);
}
@Override
@@ -190,7 +194,7 @@
final int len = ranks.length;
final double[] quantiles = new double[len];
for (int i = 0; i < len; i++) {
- quantiles[i] = classicQdsSV.getQuantile(ranks[i], searchCrit);
+ quantiles[i] = doublesSV.getQuantile(ranks[i], searchCrit);
}
return quantiles;
}
@@ -219,7 +223,7 @@
public double getRank(final double quantile, final QuantileSearchCriteria searchCrit) {
if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
refreshSortedView();
- return classicQdsSV.getRank(quantile, searchCrit);
+ return doublesSV.getRank(quantile, searchCrit);
}
/**
@@ -249,7 +253,7 @@
final int len = quantiles.length;
final double[] ranks = new double[len];
for (int i = 0; i < len; i++) {
- ranks[i] = classicQdsSV.getRank(quantiles[i], searchCrit);
+ ranks[i] = doublesSV.getRank(quantiles[i], searchCrit);
}
return ranks;
}
@@ -508,11 +512,6 @@
return new DoublesSketchIterator(this, getBitPattern());
}
- @Override
- public DoublesSortedView getSortedView() {
- return new DoublesSketchSortedView(this);
- }
-
/**
* {@inheritDoc}
* <p>The parameter <i>k</i> will not change.</p>
@@ -538,10 +537,6 @@
return newSketch;
}
-private final void refreshSortedView() {
- classicQdsSV = (classicQdsSV == null) ? new DoublesSketchSortedView(this) : classicQdsSV;
-}
-
//Restricted abstract
/**
@@ -579,4 +574,225 @@
* @return the Memory if it exists, otherwise returns null.
*/
abstract WritableMemory getMemory();
+
+ //************SORTED VIEW****************************
+
+ @Override
+ public final DoublesSketchSortedView getSortedView() {
+ return refreshSortedView();
+ }
+
+ private final DoublesSketchSortedView refreshSortedView() {
+ return (doublesSV == null) ? (doublesSV = getSV()) : doublesSV;
+ }
+
+ private DoublesSketchSortedView getSV() {
+ final long totalN = getN();
+ if (isEmpty() || (totalN == 0)) { throw new SketchesArgumentException(EMPTY_MSG); }
+ final int numQuantiles = getNumRetained();
+ final double[] svQuantiles = new double[numQuantiles];
+ final long[] svCumWeights = new long[numQuantiles];
+ final DoublesSketchAccessor sketchAccessor = DoublesSketchAccessor.wrap(this);
+
+ // Populate from DoublesSketch:
+ // copy over the "levels" and then the base buffer, all with appropriate weights
+ populateFromDoublesSketch(getK(), totalN, getBitPattern(), sketchAccessor, svQuantiles, svCumWeights);
+
+ // Sort the first "numSamples" slots of the two arrays in tandem,
+ // taking advantage of the already sorted blocks of length k
+ blockyTandemMergeSort(svQuantiles, svCumWeights, numQuantiles, getK());
+
+ if (convertToCumulative(svCumWeights) != totalN) {
+ throw new SketchesStateException("Sorted View is misconfigured. TotalN does not match cumWeights.");
+ }
+ return new DoublesSketchSortedView(svQuantiles, svCumWeights, totalN, getMaxItem(), getMinItem());
+ }
+
+ private final static void populateFromDoublesSketch(
+ final int k, final long totalN, final long bitPattern,
+ final DoublesSketchAccessor sketchAccessor,
+ final double[] svQuantiles, final long[] svCumWeights) {
+ long weight = 1;
+ int index = 0;
+ long bits = bitPattern;
+ assert bits == (totalN / (2L * k)); // internal consistency check
+ for (int lvl = 0; bits != 0L; lvl++, bits >>>= 1) {
+ weight <<= 1; // X2
+ if ((bits & 1L) > 0L) {
+ sketchAccessor.setLevel(lvl);
+ for (int i = 0; i < sketchAccessor.numItems(); i++) {
+ svQuantiles[index] = sketchAccessor.get(i);
+ svCumWeights[index] = weight;
+ index++;
+ }
+ }
+ }
+
+ weight = 1; //NOT a mistake! We just copied the highest level; now we need to copy the base buffer
+ final int startOfBaseBufferBlock = index;
+
+ // Copy BaseBuffer over, along with weight = 1
+ sketchAccessor.setLevel(BB_LVL_IDX);
+ for (int i = 0; i < sketchAccessor.numItems(); i++) {
+ svQuantiles[index] = sketchAccessor.get(i);
+ svCumWeights[index] = weight;
+ index++;
+ }
+ assert index == svQuantiles.length;
+
+ // Must sort the items that came from the base buffer.
+ // Don't need to sort the corresponding weights because they are all the same.
+ final int numSamples = index;
+ Arrays.sort(svQuantiles, startOfBaseBufferBlock, numSamples);
+ }
+
+ /**
+ * blockyTandemMergeSort() is an implementation of top-down merge sort specialized
+ * for the case where the input contains successive equal-length blocks
+ * that have already been sorted, so that only the top part of the
+ * merge tree remains to be executed. Also, two arrays are sorted in tandem,
+ * as discussed below.
+ * @param svQuantiles array of quantiles for sorted view
+ * @param svCumWts array for the cumulative weights (but not yet cumulative) for sorted view
+ * @param arrLen length of quantiles array and cumWts array
+ * @param blkSize size of internal sorted blocks, equal to k
+ */
+ //used by this and UtilTest
+ static void blockyTandemMergeSort(final double[] svQuantiles, final long[] svCumWts, final int arrLen,
+ final int blkSize) {
+ assert blkSize >= 1;
+ if (arrLen <= blkSize) { return; }
+ int numblks = arrLen / blkSize;
+ if ((numblks * blkSize) < arrLen) { numblks += 1; }
+ assert ((numblks * blkSize) >= arrLen);
+
+ // duplication of the input arrays is preparation for the "ping-pong" copy reduction strategy.
+ final double[] qSrc = Arrays.copyOf(svQuantiles, arrLen);
+ final long[] cwSrc = Arrays.copyOf(svCumWts, arrLen);
+
+ blockyTandemMergeSortRecursion(qSrc, cwSrc,
+ svQuantiles, svCumWts,
+ 0, numblks,
+ blkSize, arrLen);
+ }
+
+ /**
+ * blockyTandemMergeSortRecursion() is called by blockyTandemMergeSort().
+ * In addition to performing the algorithm's top down recursion,
+ * it manages the buffer swapping that eliminates most copying.
+ * It also maps the input's pre-sorted blocks into the subarrays
+ * that are processed by tandemMerge().
+ * @param qSrc source array of quantiles
+ * @param cwSrc source weights array
+ * @param qDst destination quantiles array
+ * @param cwDst destination weights array
+ * @param grpStart group start, refers to pre-sorted blocks such as block 0, block 1, etc.
+ * @param grpLen group length, refers to pre-sorted blocks such as block 0, block 1, etc.
+ * @param blkSize block size
+ * @param arrLim array limit
+ */
+ private static void blockyTandemMergeSortRecursion(final double[] qSrc, final long[] cwSrc,
+ final double[] qDst, final long[] cwDst, final int grpStart, final int grpLen,
+ /* indices of blocks */ final int blkSize, final int arrLim) {
+ // Important note: grpStart and grpLen do NOT refer to positions in the underlying array.
+ // Instead, they refer to the pre-sorted blocks, such as block 0, block 1, etc.
+
+ assert (grpLen > 0);
+ if (grpLen == 1) { return; }
+ final int grpLen1 = grpLen / 2;
+ final int grpLen2 = grpLen - grpLen1;
+ assert (grpLen1 >= 1);
+ assert (grpLen2 >= grpLen1);
+
+ final int grpStart1 = grpStart;
+ final int grpStart2 = grpStart + grpLen1;
+
+ //swap roles of src and dst
+ blockyTandemMergeSortRecursion(qDst, cwDst,
+ qSrc, cwSrc,
+ grpStart1, grpLen1, blkSize, arrLim);
+
+ //swap roles of src and dst
+ blockyTandemMergeSortRecursion(qDst, cwDst,
+ qSrc, cwSrc,
+ grpStart2, grpLen2, blkSize, arrLim);
+
+ // here we convert indices of blocks into positions in the underlying array.
+ final int arrStart1 = grpStart1 * blkSize;
+ final int arrStart2 = grpStart2 * blkSize;
+ final int arrLen1 = grpLen1 * blkSize;
+ int arrLen2 = grpLen2 * blkSize;
+
+ // special case for the final block which might be shorter than blkSize.
+ if ((arrStart2 + arrLen2) > arrLim) { arrLen2 = arrLim - arrStart2; }
+
+ tandemMerge(qSrc, cwSrc,
+ arrStart1, arrLen1,
+ arrStart2, arrLen2,
+ qDst, cwDst,
+ arrStart1); // which will be arrStart3
+ }
+
+ /**
+ * Performs two merges in tandem. One of them provides the sort keys
+ * while the other one passively undergoes the same data motion.
+ * @param qSrc quantiles source
+ * @param cwSrc cumulative weights source
+ * @param arrStart1 Array 1 start offset
+ * @param arrLen1 Array 1 length
+ * @param arrStart2 Array 2 start offset
+ * @param arrLen2 Array 2 length
+ * @param qDst quantiles destination
+ * @param cwDst cumulative weights destination
+ * @param arrStart3 Array 3 start offset
+ */
+ private static void tandemMerge(final double[] qSrc, final long[] cwSrc,
+ final int arrStart1, final int arrLen1,
+ final int arrStart2, final int arrLen2,
+ final double[] qDst, final long[] cwDst,
+ final int arrStart3) {
+ final int arrStop1 = arrStart1 + arrLen1;
+ final int arrStop2 = arrStart2 + arrLen2;
+
+ int i1 = arrStart1;
+ int i2 = arrStart2;
+ int i3 = arrStart3;
+ while ((i1 < arrStop1) && (i2 < arrStop2)) {
+ if (qSrc[i2] < qSrc[i1]) {
+ qDst[i3] = qSrc[i2];
+ cwDst[i3] = cwSrc[i2];
+ i2++;
+ } else {
+ qDst[i3] = qSrc[i1];
+ cwDst[i3] = cwSrc[i1];
+ i1++;
+ }
+ i3++;
+ }
+
+ if (i1 < arrStop1) {
+ arraycopy(qSrc, i1, qDst, i3, arrStop1 - i1);
+ arraycopy(cwSrc, i1, cwDst, i3, arrStop1 - i1);
+ } else {
+ assert i2 < arrStop2;
+ arraycopy(qSrc, i2, qDst, i3, arrStop2 - i2);
+ arraycopy(cwSrc, i2, cwDst, i3, arrStop2 - i2);
+ }
+ }
+
+ /**
+ * Convert the individual weights into cumulative weights.
+ * An array of {1,1,1,1} becomes {1,2,3,4}
+ * @param array of actual weights from the sketch, none of the weights may be zero
+ * @return total weight
+ */
+ private static long convertToCumulative(final long[] array) {
+ long subtotal = 0;
+ for (int i = 0; i < array.length; i++) {
+ final long newSubtotal = subtotal + array[i];
+ subtotal = array[i] = newSubtotal;
+ }
+ return subtotal;
+ }
+
}
diff --git a/src/main/java/org/apache/datasketches/quantiles/DoublesSketchSortedView.java b/src/main/java/org/apache/datasketches/quantiles/DoublesSketchSortedView.java
deleted file mode 100644
index bb68061..0000000
--- a/src/main/java/org/apache/datasketches/quantiles/DoublesSketchSortedView.java
+++ /dev/null
@@ -1,354 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.datasketches.quantiles;
-
-import static java.lang.System.arraycopy;
-import static org.apache.datasketches.quantiles.DoublesSketchAccessor.BB_LVL_IDX;
-import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.INCLUSIVE;
-import static org.apache.datasketches.quantilescommon.QuantilesAPI.EMPTY_MSG;
-import static org.apache.datasketches.quantilescommon.QuantilesUtil.getNaturalRank;
-
-import java.util.Arrays;
-
-import org.apache.datasketches.common.SketchesArgumentException;
-import org.apache.datasketches.common.SketchesStateException;
-import org.apache.datasketches.quantilescommon.DoublesSortedView;
-import org.apache.datasketches.quantilescommon.DoublesSortedViewIterator;
-import org.apache.datasketches.quantilescommon.InequalitySearch;
-import org.apache.datasketches.quantilescommon.QuantileSearchCriteria;
-import org.apache.datasketches.quantilescommon.QuantilesUtil;
-
-/**
- * The SortedView of the Classic Quantiles DoublesSketch.
- * @author Alexander Saydakov
- * @author Lee Rhodes
- */
-public final class DoublesSketchSortedView implements DoublesSortedView {
- private final double[] quantiles;
- private final long[] cumWeights; //comes in as individual weights, converted to cumulative natural weights
- private final long totalN;
- private final double maxItem;
- private final double minItem;
-
- /**
- * Construct from elements, also used in testing.
- * @param quantiles sorted array of quantiles
- * @param cumWeights sorted, monotonically increasing cumulative weights.
- * @param totalN the total number of items presented to the sketch.
- * @param maxItem of type double
- * @param minItem of type double
- */
- DoublesSketchSortedView(final double[] quantiles, final long[] cumWeights, final long totalN,
- final double maxItem, final double minItem) {
- this.quantiles = quantiles;
- this.cumWeights = cumWeights;
- this.totalN = totalN;
- this.maxItem = maxItem;
- this.minItem = minItem;
- }
-
- /**
- * Constructs this Sorted View given the sketch
- * @param sketch the given Classic Quantiles DoublesSketch
- */
- public DoublesSketchSortedView(final DoublesSketch sketch) {
- if (sketch.isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
- this.totalN = sketch.getN();
- this.maxItem = sketch.getMaxItem();
- this.minItem = sketch.getMinItem();
- final int k = sketch.getK();
- final int numQuantiles = sketch.getNumRetained();
- quantiles = new double[numQuantiles];
- cumWeights = new long[numQuantiles];
- final DoublesSketchAccessor sketchAccessor = DoublesSketchAccessor.wrap(sketch);
-
- // Populate from DoublesSketch:
- // copy over the "levels" and then the base buffer, all with appropriate weights
- populateFromDoublesSketch(k, totalN, sketch.getBitPattern(), sketchAccessor, quantiles, cumWeights);
-
- // Sort the first "numSamples" slots of the two arrays in tandem,
- // taking advantage of the already sorted blocks of length k
- blockyTandemMergeSort(quantiles, cumWeights, numQuantiles, k);
- if (convertToCumulative(cumWeights) != totalN) {
- throw new SketchesStateException("Sorted View is misconfigured. TotalN does not match cumWeights.");
- }
- }
-
- @Override
- public long[] getCumulativeWeights() {
- return cumWeights.clone();
- }
-
- @Override
- public double getMaxItem() {
- return maxItem;
- }
-
- @Override
- public double getMinItem() {
- return minItem;
- }
-
- @Override
- public long getN() {
- return totalN;
- }
-
- @Override
- public double getQuantile(final double rank, final QuantileSearchCriteria searchCrit) {
- if (isEmpty()) { throw new IllegalArgumentException(EMPTY_MSG); }
- QuantilesUtil.checkNormalizedRankBounds(rank);
- final int len = cumWeights.length;
- final double naturalRank = getNaturalRank(rank, totalN, searchCrit);
- final InequalitySearch crit = (searchCrit == INCLUSIVE) ? InequalitySearch.GE : InequalitySearch.GT;
- final int index = InequalitySearch.find(cumWeights, 0, len - 1, naturalRank, crit);
- if (index == -1) {
- return quantiles[len - 1]; //EXCLUSIVE (GT) case: normRank == 1.0;
- }
- return quantiles[index];
- }
-
- @Override
- public double getRank(final double quantile, final QuantileSearchCriteria searchCrit) {
- if (isEmpty()) { throw new IllegalArgumentException(EMPTY_MSG); }
- final int len = quantiles.length;
- final InequalitySearch crit = (searchCrit == INCLUSIVE) ? InequalitySearch.LE : InequalitySearch.LT;
- final int index = InequalitySearch.find(quantiles, 0, len - 1, quantile, crit);
- if (index == -1) {
- return 0; //EXCLUSIVE (LT) case: quantile <= minQuantile; INCLUSIVE (LE) case: quantile < minQuantile
- }
- return (double)cumWeights[index] / totalN;
- }
-
- @Override
- public double[] getQuantiles() {
- return quantiles.clone();
- }
-
- @Override
- public boolean isEmpty() {
- return totalN == 0;
- }
-
- @Override
- public DoublesSortedViewIterator iterator() {
- return new DoublesSortedViewIterator(quantiles, cumWeights);
- }
-
- //restricted methods
-
- /**
- * Populate the arrays and registers from a DoublesSketch
- * @param k K parameter of the sketch
- * @param n The current size of the stream
- * @param bitPattern the bit pattern for valid log levels
- * @param sketchAccessor A DoublesSketchAccessor around the sketch
- * @param quantilesArr the consolidated array of all items from the sketch
- * @param cumWtsArr populates this array with the raw individual weights from the sketch,
- * it will be cumulative later.
- */
- private final static void populateFromDoublesSketch(
- final int k, final long n, final long bitPattern,
- final DoublesSketchAccessor sketchAccessor,
- final double[] quantilesArr, final long[] cumWtsArr) {
- long weight = 1;
- int nxt = 0;
- long bits = bitPattern;
- assert bits == (n / (2L * k)); // internal consistency check
- for (int lvl = 0; bits != 0L; lvl++, bits >>>= 1) {
- weight *= 2;
- if ((bits & 1L) > 0L) {
- sketchAccessor.setLevel(lvl);
- for (int i = 0; i < sketchAccessor.numItems(); i++) {
- quantilesArr[nxt] = sketchAccessor.get(i);
- cumWtsArr[nxt] = weight;
- nxt++;
- }
- }
- }
-
- weight = 1; //NOT a mistake! We just copied the highest level; now we need to copy the base buffer
- final int startOfBaseBufferBlock = nxt;
-
- // Copy BaseBuffer over, along with weight = 1
- sketchAccessor.setLevel(BB_LVL_IDX);
- for (int i = 0; i < sketchAccessor.numItems(); i++) {
- quantilesArr[nxt] = sketchAccessor.get(i);
- cumWtsArr[nxt] = weight;
- nxt++;
- }
- assert nxt == quantilesArr.length;
-
- // Must sort the items that came from the base buffer.
- // Don't need to sort the corresponding weights because they are all the same.
- final int numSamples = nxt;
- Arrays.sort(quantilesArr, startOfBaseBufferBlock, numSamples);
- }
-
- /**
- * blockyTandemMergeSort() is an implementation of top-down merge sort specialized
- * for the case where the input contains successive equal-length blocks
- * that have already been sorted, so that only the top part of the
- * merge tree remains to be executed. Also, two arrays are sorted in tandem,
- * as discussed below.
- * @param quantiles array of quantiles
- * @param cumWts array of cum weights
- * @param arrLen length of quantiles array and cumWts array
- * @param blkSize size of internal sorted blocks
- */
- //used by this and UtilTest
- static void blockyTandemMergeSort(final double[] quantiles, final long[] cumWts, final int arrLen,
- final int blkSize) {
- assert blkSize >= 1;
- if (arrLen <= blkSize) { return; }
- int numblks = arrLen / blkSize;
- if ((numblks * blkSize) < arrLen) { numblks += 1; }
- assert ((numblks * blkSize) >= arrLen);
-
- // duplication of the input arrays is preparation for the "ping-pong" copy reduction strategy.
- final double[] qSrc = Arrays.copyOf(quantiles, arrLen);
- final long[] cwSrc = Arrays.copyOf(cumWts, arrLen);
-
- blockyTandemMergeSortRecursion(qSrc, cwSrc,
- quantiles, cumWts,
- 0, numblks,
- blkSize, arrLen);
- }
-
- /**
- * blockyTandemMergeSortRecursion() is called by blockyTandemMergeSort().
- * In addition to performing the algorithm's top down recursion,
- * it manages the buffer swapping that eliminates most copying.
- * It also maps the input's pre-sorted blocks into the subarrays
- * that are processed by tandemMerge().
- * @param qSrc source array of quantiles
- * @param cwSrc source weights array
- * @param qDst destination quantiles array
- * @param cwDst destination weights array
- * @param grpStart group start, refers to pre-sorted blocks such as block 0, block 1, etc.
- * @param grpLen group length, refers to pre-sorted blocks such as block 0, block 1, etc.
- * @param blkSize block size
- * @param arrLim array limit
- */
- private static void blockyTandemMergeSortRecursion(final double[] qSrc, final long[] cwSrc,
- final double[] qDst, final long[] cwDst, final int grpStart, final int grpLen,
- /* indices of blocks */ final int blkSize, final int arrLim) {
- // Important note: grpStart and grpLen do NOT refer to positions in the underlying array.
- // Instead, they refer to the pre-sorted blocks, such as block 0, block 1, etc.
-
- assert (grpLen > 0);
- if (grpLen == 1) { return; }
- final int grpLen1 = grpLen / 2;
- final int grpLen2 = grpLen - grpLen1;
- assert (grpLen1 >= 1);
- assert (grpLen2 >= grpLen1);
-
- final int grpStart1 = grpStart;
- final int grpStart2 = grpStart + grpLen1;
-
- //swap roles of src and dst
- blockyTandemMergeSortRecursion(qDst, cwDst,
- qSrc, cwSrc,
- grpStart1, grpLen1, blkSize, arrLim);
-
- //swap roles of src and dst
- blockyTandemMergeSortRecursion(qDst, cwDst,
- qSrc, cwSrc,
- grpStart2, grpLen2, blkSize, arrLim);
-
- // here we convert indices of blocks into positions in the underlying array.
- final int arrStart1 = grpStart1 * blkSize;
- final int arrStart2 = grpStart2 * blkSize;
- final int arrLen1 = grpLen1 * blkSize;
- int arrLen2 = grpLen2 * blkSize;
-
- // special case for the final block which might be shorter than blkSize.
- if ((arrStart2 + arrLen2) > arrLim) { arrLen2 = arrLim - arrStart2; }
-
- tandemMerge(qSrc, cwSrc,
- arrStart1, arrLen1,
- arrStart2, arrLen2,
- qDst, cwDst,
- arrStart1); // which will be arrStart3
- }
-
- /**
- * Performs two merges in tandem. One of them provides the sort keys
- * while the other one passively undergoes the same data motion.
- * @param qSrc quantiles source
- * @param cwSrc cumulative weights source
- * @param arrStart1 Array 1 start offset
- * @param arrLen1 Array 1 length
- * @param arrStart2 Array 2 start offset
- * @param arrLen2 Array 2 length
- * @param qDst quantiles destination
- * @param cwDst cumulative weights destination
- * @param arrStart3 Array 3 start offset
- */
- private static void tandemMerge(final double[] qSrc, final long[] cwSrc,
- final int arrStart1, final int arrLen1,
- final int arrStart2, final int arrLen2,
- final double[] qDst, final long[] cwDst,
- final int arrStart3) {
- final int arrStop1 = arrStart1 + arrLen1;
- final int arrStop2 = arrStart2 + arrLen2;
-
- int i1 = arrStart1;
- int i2 = arrStart2;
- int i3 = arrStart3;
- while ((i1 < arrStop1) && (i2 < arrStop2)) {
- if (qSrc[i2] < qSrc[i1]) {
- qDst[i3] = qSrc[i2];
- cwDst[i3] = cwSrc[i2];
- i2++;
- } else {
- qDst[i3] = qSrc[i1];
- cwDst[i3] = cwSrc[i1];
- i1++;
- }
- i3++;
- }
-
- if (i1 < arrStop1) {
- arraycopy(qSrc, i1, qDst, i3, arrStop1 - i1);
- arraycopy(cwSrc, i1, cwDst, i3, arrStop1 - i1);
- } else {
- assert i2 < arrStop2;
- arraycopy(qSrc, i2, qDst, i3, arrStop2 - i2);
- arraycopy(cwSrc, i2, cwDst, i3, arrStop2 - i2);
- }
- }
-
- /**
- * Convert the individual weights into cumulative weights.
- * An array of {1,1,1,1} becomes {1,2,3,4}
- * @param array of actual weights from the sketch, none of the weights may be zero
- * @return total weight
- */
- private static long convertToCumulative(final long[] array) {
- long subtotal = 0;
- for (int i = 0; i < array.length; i++) {
- final long newSubtotal = subtotal + array[i];
- subtotal = array[i] = newSubtotal;
- }
- return subtotal;
- }
-
-}
diff --git a/src/main/java/org/apache/datasketches/quantiles/DoublesUnionImpl.java b/src/main/java/org/apache/datasketches/quantiles/DoublesUnionImpl.java
index b55f0b5..38854d3 100644
--- a/src/main/java/org/apache/datasketches/quantiles/DoublesUnionImpl.java
+++ b/src/main/java/org/apache/datasketches/quantiles/DoublesUnionImpl.java
@@ -123,14 +123,14 @@
public void union(final DoublesSketch sketchIn) {
Objects.requireNonNull(sketchIn);
gadget_ = updateLogic(maxK_, gadget_, sketchIn);
- gadget_.classicQdsSV = null;
+ gadget_.doublesSV = null;
}
@Override
public void union(final Memory mem) {
Objects.requireNonNull(mem);
gadget_ = updateLogic(maxK_, gadget_, DoublesSketch.wrap(mem));
- gadget_.classicQdsSV = null;
+ gadget_.doublesSV = null;
}
@Override
@@ -139,7 +139,7 @@
gadget_ = HeapUpdateDoublesSketch.newInstance(maxK_);
}
gadget_.update(quantile);
- gadget_.classicQdsSV = null;
+ gadget_.doublesSV = null;
}
@Override
diff --git a/src/main/java/org/apache/datasketches/quantiles/HeapUpdateDoublesSketch.java b/src/main/java/org/apache/datasketches/quantiles/HeapUpdateDoublesSketch.java
index 8282f00..2197d8b 100644
--- a/src/main/java/org/apache/datasketches/quantiles/HeapUpdateDoublesSketch.java
+++ b/src/main/java/org/apache/datasketches/quantiles/HeapUpdateDoublesSketch.java
@@ -270,7 +270,7 @@
baseBufferCount_ = newBBCount;
}
n_ = newN;
- classicQdsSV = null;
+ doublesSV = null;
}
/**
diff --git a/src/main/java/org/apache/datasketches/quantiles/ItemsSketch.java b/src/main/java/org/apache/datasketches/quantiles/ItemsSketch.java
index 54795d7..13aee81 100644
--- a/src/main/java/org/apache/datasketches/quantiles/ItemsSketch.java
+++ b/src/main/java/org/apache/datasketches/quantiles/ItemsSketch.java
@@ -531,12 +531,6 @@
}
@Override
- public ItemsSketchSortedView<T> getSortedView() {
- if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
- return refreshSortedView();
- }
-
- @Override
public void update(final T item) {
// this method only uses the base buffer part of the combined buffer
@@ -631,69 +625,84 @@
sketch.combinedBuffer_ = Arrays.copyOf(baseBuffer, newSize);
}
- private final ItemsSketchSortedView<T> refreshSortedView() {
- if (classicQisSV == null) {
- final CreateSortedView csv = new CreateSortedView();
- classicQisSV = csv.getSV();
- }
- return classicQisSV;
+ //************SORTED VIEW****************************
+
+ @Override
+ public ItemsSketchSortedView<T> getSortedView() {
+ return refreshSortedView();
}
- @SuppressWarnings({"rawtypes","unchecked"})
- private final class CreateSortedView {
- final long n = getN();
- final int numQuantiles = getNumRetained();
- final T[] quantiles = (T[]) Array.newInstance(clazz, numQuantiles);
- long[] cumWeights = new long[numQuantiles];
- final int k = getK();
+ private final ItemsSketchSortedView<T> refreshSortedView() {
+ return (classicQisSV == null) ? (classicQisSV = getSV(this)) : classicQisSV;
+ }
- final T[] combinedBuffer = (T[]) getCombinedBuffer();
- final int baseBufferCount = getBaseBufferCount();
- final Comparator<? super T> comparator = ItemsSketch.this.comparator_;
+ @SuppressWarnings({"unchecked"})
+ private static <T> ItemsSketchSortedView<T> getSV(final ItemsSketch<T> sk) {
+ final long totalN = sk.getN();
+ if (sk.isEmpty() || (totalN == 0)) { throw new SketchesArgumentException(EMPTY_MSG); }
+ final int k = sk.getK();
+ final int numQuantiles = sk.getNumRetained();
+ final T[] svQuantiles = (T[]) Array.newInstance(sk.clazz, numQuantiles);
+ final long[] svCumWeights = new long[numQuantiles];
+ final Comparator<? super T> comparator = sk.comparator_;
- ItemsSketchSortedView<T> getSV() {
- long weight = 1;
- int index = 0;
- long bits = getBitPattern();
- assert bits == (n / (2L * k)); // internal consistency check
- for (int lvl = 0; bits != 0L; lvl++, bits >>>= 1) {
- weight *= 2;
- if ((bits & 1L) > 0L) {
- final int offset = (2 + lvl) * k;
- for (int i = 0; i < k; i++) {
- quantiles[index] = combinedBuffer[i + offset];
- cumWeights[index] = weight;
- index++;
- }
+ final T[] combinedBuffer = (T[]) sk.getCombinedBuffer();
+ final int baseBufferCount = sk.getBaseBufferCount();
+
+ // Populate from ItemsSketch:
+ // copy over the "levels" and then the base buffer, all with appropriate weights
+ populateFromItemsSketch(k, totalN, sk.getBitPattern(), combinedBuffer, baseBufferCount,
+ numQuantiles, svQuantiles, svCumWeights, sk.getComparator());
+
+ // Sort the first "numSamples" slots of the two arrays in tandem,
+ // taking advantage of the already sorted blocks of length k
+ ItemsMergeImpl.blockyTandemMergeSort(svQuantiles, svCumWeights, numQuantiles, k, comparator);
+
+ if (convertToCumulative(svCumWeights) != totalN) {
+ throw new SketchesStateException("Sorted View is misconfigured. TotalN does not match cumWeights.");
+ }
+
+ final double normRankErr = getNormalizedRankError(sk.getK(), true);
+ return new ItemsSketchSortedView<>(
+ svQuantiles, svCumWeights, sk.getN(), comparator, sk.getMaxItem(), sk.getMinItem(), normRankErr);
+
+ }
+
+ private final static <T> void populateFromItemsSketch(
+ final int k, final long totalN, final long bitPattern, final T[] combinedBuffer,
+ final int baseBufferCount, final int numQuantiles, final T[] svQuantiles, final long[] svCumWeights,
+ final Comparator<? super T> comparator) {
+
+ long weight = 1;
+ int index = 0;
+ long bits = bitPattern;
+ assert bits == (totalN / (2L * k)); // internal consistency check
+ for (int lvl = 0; bits != 0L; lvl++, bits >>>= 1) {
+ weight <<= 1; // X2
+ if ((bits & 1L) > 0L) {
+ final int offset = (2 + lvl) * k;
+ for (int i = 0; i < k; i++) {
+ svQuantiles[index] = combinedBuffer[i + offset];
+ svCumWeights[index] = weight;
+ index++;
}
}
-
- weight = 1; //NOT a mistake! We just copied the highest level; now we need to copy the base buffer
- final int startOfBaseBufferBlock = index;
-
- // Copy BaseBuffer over, along with weight = 1
- for (int i = 0; i < baseBufferCount; i++) {
- quantiles[index] = combinedBuffer[i];
- cumWeights[index] = weight;
- index++;
- }
- assert index == numQuantiles;
-
- // Must sort the items that came from the base buffer.
- // Don't need to sort the corresponding weights because they are all the same.
- Arrays.sort(quantiles, startOfBaseBufferBlock, numQuantiles, comparator);
-
- // Sort the first "numSamples" slots of the two arrays in tandem,
- // taking advantage of the already sorted blocks of length k
- ItemsMergeImpl.blockyTandemMergeSort(quantiles, cumWeights, numQuantiles, k, comparator);
-
- if (convertToCumulative(cumWeights) != n) {
- throw new SketchesStateException("Sorted View is misconfigured. TotalN does not match cumWeights.");
- }
- final double normRankErr = getNormalizedRankError(getK(), true);
- return new ItemsSketchSortedView(
- quantiles, cumWeights, getN(), comparator, getMaxItem(), getMinItem(), normRankErr);
}
+
+ weight = 1; //NOT a mistake! We just copied the highest level; now we need to copy the base buffer
+ final int startOfBaseBufferBlock = index;
+
+ // Copy BaseBuffer over, along with weight = 1
+ for (int i = 0; i < baseBufferCount; i++) {
+ svQuantiles[index] = combinedBuffer[i];
+ svCumWeights[index] = weight;
+ index++;
+ }
+ assert index == numQuantiles;
+
+ // Must sort the items that came from the base buffer.
+ // Don't need to sort the corresponding weights because they are all the same.
+ Arrays.sort(svQuantiles, startOfBaseBufferBlock, numQuantiles, comparator);
}
/**
diff --git a/src/main/java/org/apache/datasketches/quantiles/KolmogorovSmirnov.java b/src/main/java/org/apache/datasketches/quantiles/KolmogorovSmirnov.java
index c290b05..6b8079e 100644
--- a/src/main/java/org/apache/datasketches/quantiles/KolmogorovSmirnov.java
+++ b/src/main/java/org/apache/datasketches/quantiles/KolmogorovSmirnov.java
@@ -19,6 +19,8 @@
package org.apache.datasketches.quantiles;
+import org.apache.datasketches.quantilescommon.DoublesSketchSortedView;
+
/**
* Kolmogorov-Smirnov Test
* See <a href="https://en.wikipedia.org/wiki/Kolmogorov-Smirnov_test">Kolmogorov–Smirnov Test</a>
@@ -36,8 +38,8 @@
* @return the raw delta area between two quantile sketches
*/
public static double computeKSDelta(final DoublesSketch sketch1, final DoublesSketch sketch2) {
- final DoublesSketchSortedView p = new DoublesSketchSortedView(sketch1);
- final DoublesSketchSortedView q = new DoublesSketchSortedView(sketch2);
+ final DoublesSketchSortedView p = sketch1.getSortedView();
+ final DoublesSketchSortedView q = sketch2.getSortedView();
final double[] pSamplesArr = p.getQuantiles();
final double[] qSamplesArr = q.getQuantiles();
diff --git a/src/main/java/org/apache/datasketches/quantilescommon/DoublesSketchSortedView.java b/src/main/java/org/apache/datasketches/quantilescommon/DoublesSketchSortedView.java
new file mode 100644
index 0000000..b033236
--- /dev/null
+++ b/src/main/java/org/apache/datasketches/quantilescommon/DoublesSketchSortedView.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.datasketches.quantilescommon;
+
+import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.INCLUSIVE;
+import static org.apache.datasketches.quantilescommon.QuantilesAPI.EMPTY_MSG;
+import static org.apache.datasketches.quantilescommon.QuantilesUtil.getNaturalRank;
+
+import org.apache.datasketches.common.SketchesArgumentException;
+
+/**
+ * The SortedView of the Quantiles Classic DoublesSketch and the KllDoublesSketch.
+ * @author Alexander Saydakov
+ * @author Lee Rhodes
+ */
+public final class DoublesSketchSortedView implements DoublesSortedView {
+ private final double[] quantiles;
+ private final long[] cumWeights; //comes in as individual weights, converted to cumulative natural weights
+ private final long totalN;
+ private final double maxItem;
+ private final double minItem;
+
+ /**
+ * Construct from elements, also used in testing.
+ * @param quantiles sorted array of quantiles
+ * @param cumWeights sorted, monotonically increasing cumulative weights.
+ * @param totalN the total number of items presented to the sketch.
+ * @param maxItem of type double
+ * @param minItem of type double
+ */
+ public DoublesSketchSortedView(final double[] quantiles, final long[] cumWeights, final long totalN,
+ final double maxItem, final double minItem) {
+ this.quantiles = quantiles;
+ this.cumWeights = cumWeights;
+ this.totalN = totalN;
+ this.maxItem = maxItem;
+ this.minItem = minItem;
+ }
+
+ @Override
+ public long[] getCumulativeWeights() {
+ return cumWeights.clone();
+ }
+
+ @Override
+ public double getMaxItem() {
+ return maxItem;
+ }
+
+ @Override
+ public double getMinItem() {
+ return minItem;
+ }
+
+ @Override
+ public long getN() {
+ return totalN;
+ }
+
+ @Override
+ public double getQuantile(final double rank, final QuantileSearchCriteria searchCrit) {
+ if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
+ QuantilesUtil.checkNormalizedRankBounds(rank);
+ final int len = cumWeights.length;
+ final double naturalRank = getNaturalRank(rank, totalN, searchCrit);
+ final InequalitySearch crit = (searchCrit == INCLUSIVE) ? InequalitySearch.GE : InequalitySearch.GT;
+ final int index = InequalitySearch.find(cumWeights, 0, len - 1, naturalRank, crit);
+ if (index == -1) {
+ return quantiles[len - 1]; //EXCLUSIVE (GT) case: normRank == 1.0;
+ }
+ return quantiles[index];
+ }
+
+ @Override
+ public double[] getQuantiles() {
+ return quantiles.clone();
+ }
+
+ @Override
+ public double getRank(final double quantile, final QuantileSearchCriteria searchCrit) {
+ if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
+ final int len = quantiles.length;
+ final InequalitySearch crit = (searchCrit == INCLUSIVE) ? InequalitySearch.LE : InequalitySearch.LT;
+ final int index = InequalitySearch.find(quantiles, 0, len - 1, quantile, crit);
+ if (index == -1) {
+ return 0; //EXCLUSIVE (LT) case: quantile <= minQuantile; INCLUSIVE (LE) case: quantile < minQuantile
+ }
+ return (double)cumWeights[index] / totalN;
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return totalN == 0;
+ }
+
+ @Override
+ public DoublesSortedViewIterator iterator() {
+ return new DoublesSortedViewIterator(quantiles, cumWeights);
+ }
+
+}
diff --git a/src/main/java/org/apache/datasketches/tdigest/TDigestDouble.java b/src/main/java/org/apache/datasketches/tdigest/TDigestDouble.java
index 9a93b1c..b1a22e3 100644
--- a/src/main/java/org/apache/datasketches/tdigest/TDigestDouble.java
+++ b/src/main/java/org/apache/datasketches/tdigest/TDigestDouble.java
@@ -20,7 +20,7 @@
package org.apache.datasketches.tdigest;
import java.nio.ByteOrder;
-import java.util.function.Function;
+import java.util.Arrays;
import org.apache.datasketches.common.Family;
import org.apache.datasketches.common.SketchesArgumentException;
@@ -47,7 +47,6 @@
private boolean reverseMerge_;
private final short k_;
- private final short internalK_;
private double minValue_;
private double maxValue_;
private int centroidsCapacity_;
@@ -55,11 +54,10 @@
private double[] centroidMeans_;
private long[] centroidWeights_;
private long centroidsWeight_;
- private int bufferCapacity_;
private int numBuffered_;
private double[] bufferValues_;
- private long[] bufferWeights_;
- private long bufferedWeight_;
+
+ private static final int BUFFER_MULTIPLIER = 4;
private static final byte PREAMBLE_LONGS_EMPTY_OR_SINGLE = 1;
private static final byte PREAMBLE_LONGS_MULTIPLE = 2;
@@ -82,7 +80,7 @@
* @param k affects the size of TDigest and its estimation error
*/
public TDigestDouble(final short k) {
- this(false, k, Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY, null, null, 0);
+ this(false, k, Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY, null, null, 0, null);
}
/**
@@ -98,11 +96,9 @@
*/
public void update(final double value) {
if (Double.isNaN(value)) { return; }
- if (numBuffered_ == bufferCapacity_ - numCentroids_) { mergeBuffered(); }
+ if (numBuffered_ == centroidsCapacity_ * BUFFER_MULTIPLIER) { compress(); }
bufferValues_[numBuffered_] = value;
- bufferWeights_[numBuffered_] = 1;
numBuffered_++;
- bufferedWeight_++;
minValue_ = Math.min(minValue_, value);
maxValue_ = Math.max(maxValue_, value);
}
@@ -114,36 +110,28 @@
public void merge(final TDigestDouble other) {
if (other.isEmpty()) { return; }
final int num = numCentroids_ + numBuffered_ + other.numCentroids_ + other.numBuffered_;
- if (num <= bufferCapacity_) {
- System.arraycopy(other.bufferValues_, 0, bufferValues_, numBuffered_, other.numBuffered_);
- System.arraycopy(other.bufferWeights_, 0, bufferWeights_, numBuffered_, other.numBuffered_);
- numBuffered_ += other.numBuffered_;
- System.arraycopy(other.centroidMeans_, 0, bufferValues_, numBuffered_, other.numCentroids_);
- System.arraycopy(other.centroidWeights_, 0, bufferWeights_, numBuffered_, other.numCentroids_);
- numBuffered_ += other.numCentroids_;
- bufferedWeight_ += other.getTotalWeight();
- minValue_ = Math.min(minValue_, other.minValue_);
- maxValue_ = Math.max(maxValue_, other.maxValue_);
- } else {
- final double[] values = new double[num];
- final long[] weights = new long[num];
- System.arraycopy(bufferValues_, 0, values, 0, numBuffered_);
- System.arraycopy(bufferWeights_, 0, weights, 0, numBuffered_);
- System.arraycopy(other.bufferValues_, 0, values, numBuffered_, other.numBuffered_);
- System.arraycopy(other.bufferWeights_, 0, weights, numBuffered_, other.numBuffered_);
- numBuffered_ += other.numBuffered_;
- System.arraycopy(other.centroidMeans_, 0, values, numBuffered_, other.numCentroids_);
- System.arraycopy(other.centroidWeights_, 0, weights, numBuffered_, other.numCentroids_);
- numBuffered_ += other.numCentroids_;
- merge(values, weights, bufferedWeight_ + other.getTotalWeight(), numBuffered_);
- }
+ final double[] values = new double[num];
+ final long[] weights = new long[num];
+ System.arraycopy(bufferValues_, 0, values, 0, numBuffered_);
+ Arrays.fill(weights, 0, numBuffered_, 1);
+ System.arraycopy(other.bufferValues_, 0, values, numBuffered_, other.numBuffered_);
+ Arrays.fill(weights, numBuffered_, numBuffered_ + other.numBuffered_, 1);
+ System.arraycopy(other.centroidMeans_, 0, values, numBuffered_ + other.numBuffered_, other.numCentroids_);
+ System.arraycopy(other.centroidWeights_, 0, weights, numBuffered_ + other.numBuffered_, other.numCentroids_);
+ merge(values, weights, numBuffered_ + other.getTotalWeight(), numBuffered_ + other.numBuffered_ + other.numCentroids_);
}
/**
* Process buffered values and merge centroids if needed
*/
public void compress() {
- mergeBuffered();
+ if (numBuffered_ == 0) { return; }
+ final int num = numBuffered_ + numCentroids_;
+ final double[] values = new double[num];
+ final long[] weights = new long[num];
+ System.arraycopy(bufferValues_, 0, values, 0, numBuffered_);
+ Arrays.fill(weights, 0, numBuffered_, 1);
+ merge(values, weights, numBuffered_, numBuffered_);
}
/**
@@ -173,7 +161,7 @@
* @return total weight
*/
public long getTotalWeight() {
- return centroidsWeight_ + bufferedWeight_;
+ return centroidsWeight_ + numBuffered_;
}
/**
@@ -188,7 +176,7 @@
if (value > maxValue_) { return 1; }
if (numCentroids_ + numBuffered_ == 1) { return 0.5; }
- mergeBuffered(); // side effect
+ compress(); // side effect
// left tail
final double firstMean = centroidMeans_[0];
@@ -244,7 +232,7 @@
if (Double.isNaN(rank)) { throw new SketchesArgumentException("Operation is undefined for Nan"); }
if (rank < 0 || rank > 1) { throw new SketchesArgumentException("Normalized rank must be within [0, 1]"); }
- mergeBuffered(); // side effect
+ compress(); // side effect
if (numCentroids_ == 1) { return centroidMeans_[0]; }
@@ -293,7 +281,7 @@
* @return size in bytes needed to serialize this tdigest
*/
int getSerializedSizeBytes() {
- mergeBuffered(); // side effect
+ compress(); // side effect
return getPreambleLongs() * Long.BYTES
+ (isEmpty() ? 0 : (isSingleValue() ? Double.BYTES : 2 * Double.BYTES + (Double.BYTES + Long.BYTES) * numCentroids_));
}
@@ -303,7 +291,7 @@
* @return byte array
*/
public byte[] toByteArray() {
- mergeBuffered(); // side effect
+ compress(); // side effect
final byte[] bytes = new byte[getSerializedSizeBytes()];
final WritableBuffer wbuf = WritableMemory.writableWrap(bytes).asWritableBuffer();
wbuf.putByte((byte) getPreambleLongs());
@@ -380,7 +368,7 @@
} else {
value = buff.getDouble();
}
- return new TDigestDouble(reverseMerge, k, value, value, new double[] {value}, new long[] {1}, 1);
+ return new TDigestDouble(reverseMerge, k, value, value, new double[] {value}, new long[] {1}, 1, null);
}
final int numCentroids = buff.getInt();
buff.getInt(); // unused
@@ -401,7 +389,7 @@
weights[i] = isFloat ? buff.getInt() : buff.getLong();
totalWeight += weights[i];
}
- return new TDigestDouble(reverseMerge, k, min, max, means, weights, totalWeight);
+ return new TDigestDouble(reverseMerge, k, min, max, means, weights, totalWeight, null);
}
// compatibility with the format of the reference implementation
@@ -425,7 +413,7 @@
means[i] = buff.getDouble();
totalWeight += weights[i];
}
- return new TDigestDouble(false, k, min, max, means, weights, totalWeight);
+ return new TDigestDouble(false, k, min, max, means, weights, totalWeight, null);
}
// COMPAT_FLOAT: compatibility with asSmallBytes()
final double min = buff.getDouble(); // reference implementation uses doubles for min and max
@@ -443,7 +431,7 @@
means[i] = buff.getFloat();
totalWeight += weights[i];
}
- return new TDigestDouble(false, k, min, max, means, weights, totalWeight);
+ return new TDigestDouble(false, k, min, max, means, weights, totalWeight, null);
}
/**
@@ -464,14 +452,12 @@
final StringBuilder sb = new StringBuilder();
sb.append("MergingDigest").append(LS)
- .append(" Nominal Compression: ").append(k_).append(LS)
- .append(" Internal Compression: ").append(internalK_).append(LS)
+ .append(" Compression: ").append(k_).append(LS)
.append(" Centroids: ").append(numCentroids_).append(LS)
.append(" Buffered: ").append(numBuffered_).append(LS)
.append(" Centroids Capacity: ").append(centroidsCapacity_).append(LS)
- .append(" Buffer Capacity: ").append(bufferCapacity_).append(LS)
+ .append(" Buffer Capacity: ").append(centroidsCapacity_ * BUFFER_MULTIPLIER).append(LS)
.append("Centroids Weight: ").append(centroidsWeight_).append(LS)
- .append(" Buffered Weight: ").append(bufferedWeight_).append(LS)
.append(" Total Weight: ").append(getTotalWeight()).append(LS)
.append(" Reverse Merge: ").append(reverseMerge_).append(LS);
if (!isEmpty()) {
@@ -488,7 +474,7 @@
if (numBuffered_ > 0) {
sb.append("Buffer:").append(LS);
for (int i = 0; i < numBuffered_; i++) {
- sb.append(i).append(": ").append(bufferValues_[i]).append(", ").append(bufferWeights_[i]).append(LS);
+ sb.append(i).append(": ").append(bufferValues_[i]).append(LS);
}
}
}
@@ -496,37 +482,29 @@
}
private TDigestDouble(final boolean reverseMerge, final short k, final double min, final double max,
- final double[] means, final long[] weights, final long weight) {
- reverseMerge_ = reverseMerge;
+ final double[] means, final long[] weights, final long weight, final double[] buffer) {
+ reverseMerge_ = reverseMerge;
k_ = k;
minValue_ = min;
maxValue_ = max;
if (k < 10) { throw new SketchesArgumentException("k must be at least 10"); }
final int fudge = k < 30 ? 30 : 10;
centroidsCapacity_ = k_ * 2 + fudge;
- bufferCapacity_ = centroidsCapacity_ * 5;
- final double scale = Math.max(1.0, (double) bufferCapacity_ / centroidsCapacity_ - 1.0);
- internalK_ = (short) Math.ceil(Math.sqrt(scale) * k_);
- centroidsCapacity_ = Math.max(centroidsCapacity_, internalK_ + fudge);
- bufferCapacity_ = Math.max(bufferCapacity_, centroidsCapacity_ * 2);
centroidMeans_ = new double[centroidsCapacity_];
centroidWeights_ = new long[centroidsCapacity_];
- bufferValues_ = new double[bufferCapacity_];
- bufferWeights_ = new long[bufferCapacity_];
+ bufferValues_ = new double[centroidsCapacity_ * BUFFER_MULTIPLIER];
numCentroids_ = 0;
numBuffered_ = 0;
centroidsWeight_ = weight;
- bufferedWeight_ = 0;
if (means != null && weights != null) {
System.arraycopy(means, 0, centroidMeans_, 0, means.length);
System.arraycopy(weights, 0, centroidWeights_, 0, weights.length);
numCentroids_ = means.length;
}
- }
-
- private void mergeBuffered() {
- if (numBuffered_ == 0) { return; }
- merge(bufferValues_, bufferWeights_, bufferedWeight_, numBuffered_);
+ if (buffer != null) {
+ System.arraycopy(buffer, 0, bufferValues_, 0, buffer.length);
+ numBuffered_ = buffer.length;
+ }
}
// assumes that there is enough room in the input arrays to add centroids from this TDigest
@@ -552,7 +530,7 @@
if (current != 1 && current != num - 1) {
final double q0 = weightSoFar / centroidsWeight_;
final double q2 = (weightSoFar + proposedWeight) / centroidsWeight_;
- final double normalizer = ScaleFunction.normalizer(internalK_, centroidsWeight_);
+ final double normalizer = ScaleFunction.normalizer(k_ * 2, centroidsWeight_);
addThis = proposedWeight <= centroidsWeight_ * Math.min(ScaleFunction.max(q0, normalizer), ScaleFunction.max(q2, normalizer));
}
if (addThis) { // merge into existing centroid
@@ -572,7 +550,6 @@
Sort.reverse(centroidWeights_, numCentroids_);
}
numBuffered_ = 0;
- bufferedWeight_ = 0;
reverseMerge_ = !reverseMerge_;
minValue_ = Math.min(minValue_, centroidMeans_[0]);
maxValue_ = Math.max(maxValue_, centroidMeans_[numCentroids_ - 1]);
@@ -604,7 +581,7 @@
return 4 * Math.log(n / compression) + 24;
}
}
-
+
private static double weightedAverage(final double x1, final double w1, final double x2, final double w2) {
return (x1 * w1 + x2 * w2) / (w1 + w2);
}
diff --git a/src/test/java/org/apache/datasketches/quantiles/CustomQuantilesTest.java b/src/test/java/org/apache/datasketches/quantiles/CustomQuantilesTest.java
index d319388..101c26c 100644
--- a/src/test/java/org/apache/datasketches/quantiles/CustomQuantilesTest.java
+++ b/src/test/java/org/apache/datasketches/quantiles/CustomQuantilesTest.java
@@ -26,6 +26,7 @@
import static org.apache.datasketches.quantilescommon.QuantilesUtil.getNaturalRank;
import static org.testng.Assert.assertEquals;
+import org.apache.datasketches.quantilescommon.DoublesSketchSortedView;
import org.testng.annotations.Test;
public class CustomQuantilesTest {
@@ -55,7 +56,7 @@
}
}
long N = sk.getN();
- DoublesSketchSortedView sv = new DoublesSketchSortedView(sk);
+ DoublesSketchSortedView sv = sk.getSortedView();
double[] quantilesArr = sv.getQuantiles();
long[] cumWtsArr = sv.getCumulativeWeights();
int lenQ = quantilesArr.length;
diff --git a/src/test/java/org/apache/datasketches/quantiles/UtilTest.java b/src/test/java/org/apache/datasketches/quantiles/UtilTest.java
index 0ca3ea4..49546df 100644
--- a/src/test/java/org/apache/datasketches/quantiles/UtilTest.java
+++ b/src/test/java/org/apache/datasketches/quantiles/UtilTest.java
@@ -165,7 +165,6 @@
}
/**
- *
* @param numTries number of tries
* @param maxArrLen maximum length of array size
*/
@@ -178,7 +177,7 @@
arr = makeMergeTestInput(arrLen, blkSize);
long [] brr = makeTheTandemArray(arr);
assertMergeTestPrecondition(arr, brr, arrLen, blkSize);
- DoublesSketchSortedView.blockyTandemMergeSort(arr, brr, arrLen, blkSize);
+ DoublesSketch.blockyTandemMergeSort(arr, brr, arrLen, blkSize);
/* verify sorted order */
for (int i = 0; i < (arrLen-1); i++) {
assert arr[i] <= arr[i+1];
diff --git a/src/test/java/org/apache/datasketches/quantilescommon/CrossCheckQuantilesTest.java b/src/test/java/org/apache/datasketches/quantilescommon/CrossCheckQuantilesTest.java
index dbbbb65..6049f2f 100644
--- a/src/test/java/org/apache/datasketches/quantilescommon/CrossCheckQuantilesTest.java
+++ b/src/test/java/org/apache/datasketches/quantilescommon/CrossCheckQuantilesTest.java
@@ -29,8 +29,7 @@
import static org.apache.datasketches.quantilescommon.LinearRanksAndQuantiles.getTrueItemRank;
import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.EXCLUSIVE;
import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.INCLUSIVE;
-import static org.apache.datasketches.quantilescommon.ReflectUtilityTest.CLASSIC_DOUBLES_SV_CTOR;
-import static org.apache.datasketches.quantilescommon.ReflectUtilityTest.KLL_DOUBLES_SV_CTOR;
+import static org.apache.datasketches.quantilescommon.ReflectUtilityTest.DOUBLES_SV_CTOR;
import static org.apache.datasketches.quantilescommon.ReflectUtilityTest.KLL_FLOATS_SV_CTOR;
import static org.apache.datasketches.quantilescommon.ReflectUtilityTest.REQ_SV_CTOR;
import static org.testng.Assert.assertEquals;
@@ -40,13 +39,11 @@
import org.apache.datasketches.common.ArrayOfStringsSerDe;
import org.apache.datasketches.common.SketchesArgumentException;
import org.apache.datasketches.kll.KllDoublesSketch;
-import org.apache.datasketches.kll.KllDoublesSketchSortedView;
import org.apache.datasketches.kll.KllFloatsSketch;
import org.apache.datasketches.kll.KllFloatsSketchSortedView;
import org.apache.datasketches.kll.KllItemsSketch;
import org.apache.datasketches.kll.KllSketch;
import org.apache.datasketches.quantiles.DoublesSketch;
-import org.apache.datasketches.quantiles.DoublesSketchSortedView;
import org.apache.datasketches.quantiles.ItemsSketch;
import org.apache.datasketches.quantiles.UpdateDoublesSketch;
import org.apache.datasketches.req.ReqSketch;
@@ -146,10 +143,10 @@
ReqSketchSortedView reqFloatsSV = null;
KllFloatsSketchSortedView kllFloatsSV = null;
- KllDoublesSketchSortedView kllDoublesSV = null;
+ DoublesSketchSortedView kllDoublesSV = null;
DoublesSketchSortedView classicDoublesSV = null;
ItemsSketchSortedView<String> kllItemsSV = null;
- ItemsSketchSortedView<String> itemsSV = null;
+ ItemsSketchSortedView<String> classicItemsSV = null;
public CrossCheckQuantilesTest() {}
@@ -221,7 +218,7 @@
testRank = kllItemsSk.getRank(s, crit);
assertEquals(testRank, trueRank);
- testRank = itemsSV.getRank(s, crit);
+ testRank = classicItemsSV.getRank(s, crit);
assertEquals(testRank, trueRank);
testRank = itemsSk.getRank(s, crit);
assertEquals(testRank, trueRank);
@@ -286,7 +283,7 @@
testIQ = kllItemsSk.getQuantile(normRank, crit);
assertEquals(testIQ, trueIQ);
- testIQ = itemsSV.getQuantile(normRank, crit);
+ testIQ = classicItemsSV.getQuantile(normRank, crit);
assertEquals(testIQ, trueIQ);
testIQ = itemsSk.getQuantile(normRank, crit);
assertEquals(testIQ, trueIQ);
@@ -339,9 +336,9 @@
svMaxFValues[set], svMinFValues[set]);
kllFloatsSV = getRawKllFloatsSV(svFValues[set], svCumWeights[set], totalN[set],
svMaxFValues[set], svMinFValues[set]);
- kllDoublesSV = getRawKllDoublesSV(svDValues[set], svCumWeights[set], totalN[set],
+ kllDoublesSV = getRawDoublesSV(svDValues[set], svCumWeights[set], totalN[set],
svMaxDValues[set], svMinDValues[set]);
- classicDoublesSV = getRawClassicDoublesSV(svDValues[set], svCumWeights[set], totalN[set],
+ classicDoublesSV = getRawDoublesSV(svDValues[set], svCumWeights[set], totalN[set],
svMaxDValues[set], svMinDValues[set]);
String svImax = svIValues[set][svIValues[set].length - 1];
String svImin = svIValues[set][0];
@@ -349,7 +346,7 @@
kllItemsSV = new ItemsSketchSortedView<>(svIValues[set], svCumWeights[set], totalN[set],
comparator, svImax, svImin, normRankErr);
normRankErr = ItemsSketch.getNormalizedRankError(k, true);
- itemsSV = new ItemsSketchSortedView<>(svIValues[set], svCumWeights[set], totalN[set],
+ classicItemsSV = new ItemsSketchSortedView<>(svIValues[set], svCumWeights[set], totalN[set],
comparator, svImax, svImin, normRankErr);
}
@@ -365,16 +362,10 @@
return (KllFloatsSketchSortedView) KLL_FLOATS_SV_CTOR.newInstance(values, cumWeights, totalN, maxItem, minItem);
}
- private final static KllDoublesSketchSortedView getRawKllDoublesSV(
+ private final static DoublesSketchSortedView getRawDoublesSV(
final double[] values, final long[] cumWeights, final long totalN, final double maxItem, final double minItem)
throws Exception {
- return (KllDoublesSketchSortedView) KLL_DOUBLES_SV_CTOR.newInstance(values, cumWeights, totalN, maxItem, minItem);
- }
-
- private final static DoublesSketchSortedView getRawClassicDoublesSV(
- final double[] values, final long[] cumWeights, final long totalN, final double maxItem, final double minItem)
- throws Exception {
- return (DoublesSketchSortedView) CLASSIC_DOUBLES_SV_CTOR.newInstance(values, cumWeights, totalN, maxItem, minItem);
+ return (DoublesSketchSortedView) DOUBLES_SV_CTOR.newInstance(values, cumWeights, totalN, maxItem, minItem);
}
/********BUILD DATA SETS**********/
diff --git a/src/test/java/org/apache/datasketches/quantilescommon/ReflectUtilityTest.java b/src/test/java/org/apache/datasketches/quantilescommon/ReflectUtilityTest.java
index 191629f..6dbcac2 100644
--- a/src/test/java/org/apache/datasketches/quantilescommon/ReflectUtilityTest.java
+++ b/src/test/java/org/apache/datasketches/quantilescommon/ReflectUtilityTest.java
@@ -36,28 +36,23 @@
static final Class<?> REQ_SV;
static final Class<?> KLL_FLOATS_SV;
- static final Class<?> KLL_DOUBLES_SV;
- static final Class<?> CLASSIC_DOUBLES_SV;
+ static final Class<?> DOUBLES_SV;
static final Constructor<?> REQ_SV_CTOR;
static final Constructor<?> KLL_FLOATS_SV_CTOR;
- static final Constructor<?> KLL_DOUBLES_SV_CTOR;
- static final Constructor<?> CLASSIC_DOUBLES_SV_CTOR;
+ static final Constructor<?> DOUBLES_SV_CTOR;
static {
REQ_SV = getClass("org.apache.datasketches.req.ReqSketchSortedView");
KLL_FLOATS_SV = getClass("org.apache.datasketches.kll.KllFloatsSketchSortedView");
- KLL_DOUBLES_SV = getClass("org.apache.datasketches.kll.KllDoublesSketchSortedView");
- CLASSIC_DOUBLES_SV = getClass("org.apache.datasketches.quantiles.DoublesSketchSortedView");
+ DOUBLES_SV = getClass("org.apache.datasketches.quantilescommon.DoublesSketchSortedView");
REQ_SV_CTOR =
getConstructor(REQ_SV, float[].class, long[].class, long.class, float.class, float.class);
KLL_FLOATS_SV_CTOR =
getConstructor(KLL_FLOATS_SV, float[].class, long[].class, long.class, float.class, float.class);
- KLL_DOUBLES_SV_CTOR =
- getConstructor(KLL_DOUBLES_SV, double[].class, long[].class, long.class, double.class, double.class);
- CLASSIC_DOUBLES_SV_CTOR =
- getConstructor(CLASSIC_DOUBLES_SV, double[].class, long[].class, long.class, double.class, double.class);
+ DOUBLES_SV_CTOR =
+ getConstructor(DOUBLES_SV, double[].class, long[].class, long.class, double.class, double.class);
}
@Test //Example