Merge pull request #533 from romseygeek/sketch-bytebuffers-update

Allow updating sketches with ByteBuffers
diff --git a/src/main/java/org/apache/datasketches/hllmap/UniqueCountMap.java b/src/main/java/org/apache/datasketches/hllmap/UniqueCountMap.java
index 9a480d9..431920d 100644
--- a/src/main/java/org/apache/datasketches/hllmap/UniqueCountMap.java
+++ b/src/main/java/org/apache/datasketches/hllmap/UniqueCountMap.java
@@ -36,6 +36,7 @@
  * <p>The space consumed by this map is quite sensitive to the actual distribution of identifiers
  * per key, so you should characterize and or experiment with your typical input streams.
  * Nonetheless, our experiments on live streams of over 100M keys required about 1.4GB of space.
+ * This is about 14 bytes per key for key storage and unique count storage.
  *
  * <p>Given such highly-skewed distributions, using this map is far more efficient space-wise than
  * the alternative of dedicating an HLL sketch per key. Based on our use cases, after
diff --git a/src/main/java/org/apache/datasketches/kll/KllDoublesSketch.java b/src/main/java/org/apache/datasketches/kll/KllDoublesSketch.java
index 8883734..ab241f7 100644
--- a/src/main/java/org/apache/datasketches/kll/KllDoublesSketch.java
+++ b/src/main/java/org/apache/datasketches/kll/KllDoublesSketch.java
@@ -25,6 +25,7 @@
 import static org.apache.datasketches.kll.KllSketch.SketchStructure.UPDATABLE;
 import static org.apache.datasketches.kll.KllSketch.SketchType.DOUBLES_SKETCH;
 
+import java.util.Arrays;
 import java.util.Objects;
 
 import org.apache.datasketches.common.ArrayOfItemsSerDe;
@@ -35,7 +36,7 @@
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.memory.MemoryRequestServer;
 import org.apache.datasketches.memory.WritableMemory;
-import org.apache.datasketches.quantilescommon.DoublesSortedView;
+import org.apache.datasketches.quantilescommon.DoublesSketchSortedView;
 import org.apache.datasketches.quantilescommon.QuantileSearchCriteria;
 import org.apache.datasketches.quantilescommon.QuantilesDoublesAPI;
 import org.apache.datasketches.quantilescommon.QuantilesDoublesSketchIterator;
@@ -46,7 +47,7 @@
  * @see org.apache.datasketches.kll.KllSketch
  */
 public abstract class KllDoublesSketch extends KllSketch implements QuantilesDoublesAPI {
-  private KllDoublesSketchSortedView kllDoublesSV = null;
+  private DoublesSketchSortedView doublesSV = null;
   final static int ITEM_BYTES = Double.BYTES;
 
   KllDoublesSketch(
@@ -171,21 +172,21 @@
   public double[] getCDF(final double[] splitPoints, final QuantileSearchCriteria searchCrit) {
     if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
     refreshSortedView();
-    return kllDoublesSV.getCDF(splitPoints, searchCrit);
+    return doublesSV.getCDF(splitPoints, searchCrit);
   }
 
   @Override
   public double[] getPMF(final double[] splitPoints, final QuantileSearchCriteria searchCrit) {
     if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
     refreshSortedView();
-    return kllDoublesSV.getPMF(splitPoints, searchCrit);
+    return doublesSV.getPMF(splitPoints, searchCrit);
   }
 
   @Override
   public double getQuantile(final double rank, final QuantileSearchCriteria searchCrit) {
     if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
     refreshSortedView();
-    return kllDoublesSV.getQuantile(rank, searchCrit);
+    return doublesSV.getQuantile(rank, searchCrit);
   }
 
   @Override
@@ -195,7 +196,7 @@
     final int len = ranks.length;
     final double[] quantiles = new double[len];
     for (int i = 0; i < len; i++) {
-      quantiles[i] = kllDoublesSV.getQuantile(ranks[i], searchCrit);
+      quantiles[i] = doublesSV.getQuantile(ranks[i], searchCrit);
     }
     return quantiles;
   }
@@ -224,7 +225,7 @@
   public double getRank(final double quantile, final QuantileSearchCriteria searchCrit) {
     if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
     refreshSortedView();
-    return kllDoublesSV.getRank(quantile, searchCrit);
+    return doublesSV.getRank(quantile, searchCrit);
   }
 
   /**
@@ -254,17 +255,17 @@
     final int len = quantiles.length;
     final double[] ranks = new double[len];
     for (int i = 0; i < len; i++) {
-      ranks[i] = kllDoublesSV.getRank(quantiles[i], searchCrit);
+      ranks[i] = doublesSV.getRank(quantiles[i], searchCrit);
     }
     return ranks;
   }
 
   @Override
   @SuppressFBWarnings(value = "EI_EXPOSE_REP", justification = "OK in this case.")
-  public DoublesSortedView getSortedView() {
+  public DoublesSketchSortedView getSortedView() {
     if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
     refreshSortedView();
-    return kllDoublesSV;
+    return doublesSV;
   }
 
   @Override
@@ -278,9 +279,9 @@
     if (readOnly || sketchStructure != UPDATABLE) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
     if (this == other) { throw new SketchesArgumentException(SELF_MERGE_MSG); }
     final KllDoublesSketch othDblSk = (KllDoublesSketch)other;
-    if (othDblSk.isEmpty()) { return; } //then check empty
+    if (othDblSk.isEmpty()) { return; }
     KllDoublesHelper.mergeDoubleImpl(this, othDblSk);
-    kllDoublesSV = null;
+    doublesSV = null;
   }
 
   /**
@@ -299,7 +300,7 @@
     setMinItem(Double.NaN);
     setMaxItem(Double.NaN);
     setDoubleItemsArray(new double[k]);
-    kllDoublesSV = null;
+    doublesSV = null;
   }
 
   @Override
@@ -323,7 +324,7 @@
     if (Double.isNaN(item)) { return; } //ignore
     if (readOnly) { throw new SketchesArgumentException(TGT_IS_READ_ONLY_MSG); }
     KllDoublesHelper.updateDouble(this, item);
-    kllDoublesSV = null;
+    doublesSV = null;
   }
 
   /**
@@ -337,7 +338,7 @@
     if (weight < 1L) { throw new SketchesArgumentException("Weight is less than one."); }
     if (weight == 1L) { KllDoublesHelper.updateDouble(this, item); }
     else { KllDoublesHelper.updateDouble(this, item, weight); }
-    kllDoublesSV = null;
+    doublesSV = null;
   }
 
   //restricted
@@ -393,11 +394,6 @@
     return levelsArr[getNumLevels()] * Double.BYTES;
   }
 
-  private final void refreshSortedView() {
-    kllDoublesSV = (kllDoublesSV == null)
-        ? new KllDoublesSketchSortedView(this) : kllDoublesSV;
-  }
-
   abstract void setDoubleItemsArray(double[] doubleItems);
 
   abstract void setDoubleItemsArrayAt(int index, double item);
@@ -416,4 +412,135 @@
     }
   }
 
+  private final DoublesSketchSortedView refreshSortedView() {
+    if (doublesSV == null) {
+      final CreateSortedView csv = new CreateSortedView();
+      doublesSV = csv.getSV();
+    }
+    return doublesSV;
+  }
+
+  private final class CreateSortedView {
+    double[] quantiles;
+    long[] cumWeights;
+
+    DoublesSketchSortedView getSV() {
+      if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
+      if (getN() == 0) { throw new SketchesArgumentException(EMPTY_MSG); }
+      final double[] srcQuantiles = getDoubleItemsArray();
+      final int[] srcLevels = levelsArr;
+      final int srcNumLevels = getNumLevels();
+
+      if (!isLevelZeroSorted()) {
+        Arrays.sort(srcQuantiles, srcLevels[0], srcLevels[1]);
+        if (!hasMemory()) { setLevelZeroSorted(true); }
+      }
+      final int numQuantiles = getNumRetained();
+      quantiles = new double[numQuantiles];
+      cumWeights = new long[numQuantiles];
+      populateFromSketch(srcQuantiles, srcLevels, srcNumLevels, numQuantiles);
+      return new DoublesSketchSortedView(
+          quantiles, cumWeights, getN(), getMaxItem(), getMinItem());
+    }
+
+    private void populateFromSketch(final double[] srcQuantiles, final int[] srcLevels,
+      final int srcNumLevels, final int numItems) {
+      final int[] myLevels = new int[srcNumLevels + 1];
+      final int offset = srcLevels[0];
+      System.arraycopy(srcQuantiles, offset, quantiles, 0, numItems);
+      int srcLevel = 0;
+      int dstLevel = 0;
+      long weight = 1;
+      while (srcLevel < srcNumLevels) {
+        final int fromIndex = srcLevels[srcLevel] - offset;
+        final int toIndex = srcLevels[srcLevel + 1] - offset; // exclusive
+        if (fromIndex < toIndex) { // if equal, skip empty level
+          Arrays.fill(cumWeights, fromIndex, toIndex, weight);
+          myLevels[dstLevel] = fromIndex;
+          myLevels[dstLevel + 1] = toIndex;
+          dstLevel++;
+        }
+        srcLevel++;
+        weight *= 2;
+      }
+      final int numLevels = dstLevel;
+      blockyTandemMergeSort(quantiles, cumWeights, myLevels, numLevels); //create unit weights
+      KllHelper.convertToCumulative(cumWeights);
+    }
+  } //End of class CreateSortedView
+
+  private static void blockyTandemMergeSort(final double[] quantiles, final long[] weights,
+      final int[] levels, final int numLevels) {
+    if (numLevels == 1) { return; }
+
+    // duplicate the input in preparation for the "ping-pong" copy reduction strategy.
+    final double[] quantilesTmp = Arrays.copyOf(quantiles, quantiles.length);
+    final long[] weightsTmp = Arrays.copyOf(weights, quantiles.length); // don't need the extra one
+
+    blockyTandemMergeSortRecursion(quantilesTmp, weightsTmp, quantiles, weights, levels, 0, numLevels);
+  }
+
+  private static void blockyTandemMergeSortRecursion(
+      final double[] quantilesSrc, final long[] weightsSrc,
+      final double[] quantilesDst, final long[] weightsDst,
+      final int[] levels, final int startingLevel, final int numLevels) {
+    if (numLevels == 1) { return; }
+    final int numLevels1 = numLevels / 2;
+    final int numLevels2 = numLevels - numLevels1;
+    assert numLevels1 >= 1;
+    assert numLevels2 >= numLevels1;
+    final int startingLevel1 = startingLevel;
+    final int startingLevel2 = startingLevel + numLevels1;
+    // swap roles of src and dst
+    blockyTandemMergeSortRecursion(
+        quantilesDst, weightsDst,
+        quantilesSrc, weightsSrc,
+        levels, startingLevel1, numLevels1);
+    blockyTandemMergeSortRecursion(
+        quantilesDst, weightsDst,
+        quantilesSrc, weightsSrc,
+        levels, startingLevel2, numLevels2);
+    tandemMerge(
+        quantilesSrc, weightsSrc,
+        quantilesDst, weightsDst,
+        levels,
+        startingLevel1, numLevels1,
+        startingLevel2, numLevels2);
+  }
+
+  private static void tandemMerge(
+      final double[] quantilesSrc, final long[] weightsSrc,
+      final double[] quantilesDst, final long[] weightsDst,
+      final int[] levelStarts,
+      final int startingLevel1, final int numLevels1,
+      final int startingLevel2, final int numLevels2) {
+    final int fromIndex1 = levelStarts[startingLevel1];
+    final int toIndex1 = levelStarts[startingLevel1 + numLevels1]; // exclusive
+    final int fromIndex2 = levelStarts[startingLevel2];
+    final int toIndex2 = levelStarts[startingLevel2 + numLevels2]; // exclusive
+    int iSrc1 = fromIndex1;
+    int iSrc2 = fromIndex2;
+    int iDst = fromIndex1;
+
+    while (iSrc1 < toIndex1 && iSrc2 < toIndex2) {
+      if (quantilesSrc[iSrc1] < quantilesSrc[iSrc2]) {
+        quantilesDst[iDst] = quantilesSrc[iSrc1];
+        weightsDst[iDst] = weightsSrc[iSrc1];
+        iSrc1++;
+      } else {
+        quantilesDst[iDst] = quantilesSrc[iSrc2];
+        weightsDst[iDst] = weightsSrc[iSrc2];
+        iSrc2++;
+      }
+      iDst++;
+    }
+    if (iSrc1 < toIndex1) {
+      System.arraycopy(quantilesSrc, iSrc1, quantilesDst, iDst, toIndex1 - iSrc1);
+      System.arraycopy(weightsSrc, iSrc1, weightsDst, iDst, toIndex1 - iSrc1);
+    } else if (iSrc2 < toIndex2) {
+      System.arraycopy(quantilesSrc, iSrc2, quantilesDst, iDst, toIndex2 - iSrc2);
+      System.arraycopy(weightsSrc, iSrc2, weightsDst, iDst, toIndex2 - iSrc2);
+    }
+  }
+
 }
diff --git a/src/main/java/org/apache/datasketches/kll/KllDoublesSketchSortedView.java b/src/main/java/org/apache/datasketches/kll/KllDoublesSketchSortedView.java
deleted file mode 100644
index 13f0a9d..0000000
--- a/src/main/java/org/apache/datasketches/kll/KllDoublesSketchSortedView.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.datasketches.kll;
-
-import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.INCLUSIVE;
-import static org.apache.datasketches.quantilescommon.QuantilesAPI.EMPTY_MSG;
-import static org.apache.datasketches.quantilescommon.QuantilesUtil.getNaturalRank;
-
-import java.util.Arrays;
-
-import org.apache.datasketches.common.SketchesArgumentException;
-import org.apache.datasketches.quantilescommon.DoublesSortedView;
-import org.apache.datasketches.quantilescommon.DoublesSortedViewIterator;
-import org.apache.datasketches.quantilescommon.InequalitySearch;
-import org.apache.datasketches.quantilescommon.QuantileSearchCriteria;
-import org.apache.datasketches.quantilescommon.QuantilesUtil;
-
-/**
- * The SortedView of the KllDoublesSketch.
- * @author Alexander Saydakov
- * @author Lee Rhodes
- */
-public final class KllDoublesSketchSortedView implements DoublesSortedView {
-  private final double[] quantiles;
-  private final long[] cumWeights; //comes in as individual weights, converted to cumulative natural weights
-  private final long totalN;
-  private final double maxItem;
-  private final double minItem;
-
-  /**
-   * Construct from elements for testing.
-   * @param quantiles sorted array of quantiles
-   * @param cumWeights sorted, monotonically increasing cumulative weights.
-   * @param totalN the total number of items presented to the sketch.
-   */
-  KllDoublesSketchSortedView(final double[] quantiles, final long[] cumWeights, final long totalN,
-      final double maxItem, final double minItem) {
-    this.quantiles = quantiles;
-    this.cumWeights  = cumWeights;
-    this.totalN = totalN;
-    this.maxItem = maxItem;
-    this.minItem = minItem;
-  }
-
-  /**
-   * Constructs this Sorted View given the sketch
-   * @param sketch the given KllDoublesSketch.
-   */
-  public KllDoublesSketchSortedView(final KllDoublesSketch sketch) {
-    if (sketch.isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
-    this.totalN = sketch.getN();
-    this.maxItem = sketch.getMaxItem();
-    this.minItem = sketch.getMinItem();
-    final double[] srcQuantiles = sketch.getDoubleItemsArray();
-    final int[] srcLevels = sketch.levelsArr;
-    final int srcNumLevels = sketch.getNumLevels();
-
-    if (!sketch.isLevelZeroSorted()) {
-      Arrays.sort(srcQuantiles, srcLevels[0], srcLevels[1]);
-      if (!sketch.hasMemory()) { sketch.setLevelZeroSorted(true); }
-    }
-
-    final int numQuantiles = srcLevels[srcNumLevels] - srcLevels[0]; //remove free space
-    quantiles = new double[numQuantiles];
-    cumWeights = new long[numQuantiles];
-    populateFromSketch(srcQuantiles, srcLevels, srcNumLevels, numQuantiles);
-  }
-
-  @Override
-  public long[] getCumulativeWeights() {
-    return cumWeights.clone();
-  }
-
-  @Override
-  public double getMaxItem() {
-    return maxItem;
-  }
-
-  @Override
-  public double getMinItem() {
-    return minItem;
-  }
-
-  @Override
-  public long getN() {
-    return totalN;
-  }
-
-  @Override
-  public double getQuantile(final double rank, final QuantileSearchCriteria searchCrit) {
-    if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
-    QuantilesUtil.checkNormalizedRankBounds(rank);
-    final int len = cumWeights.length;
-    final double naturalRank = getNaturalRank(rank, totalN, searchCrit);
-    final InequalitySearch crit = (searchCrit == INCLUSIVE) ? InequalitySearch.GE : InequalitySearch.GT;
-    final int index = InequalitySearch.find(cumWeights, 0, len - 1, naturalRank, crit);
-    if (index == -1) {
-      return quantiles[len - 1]; //EXCLUSIVE (GT) case: normRank == 1.0;
-    }
-    return quantiles[index];
-  }
-
-  @Override
-  public double[] getQuantiles() {
-    return quantiles.clone();
-  }
-
-  @Override
-  public double getRank(final double quantile, final QuantileSearchCriteria searchCrit) {
-    if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
-    final int len = quantiles.length;
-    final InequalitySearch crit = (searchCrit == INCLUSIVE) ? InequalitySearch.LE : InequalitySearch.LT;
-    final int index = InequalitySearch.find(quantiles,  0, len - 1, quantile, crit);
-    if (index == -1) {
-      return 0; //EXCLUSIVE (LT) case: quantile <= minQuantile; INCLUSIVE (LE) case: quantile < minQuantile
-    }
-    return (double)cumWeights[index] / totalN;
-  }
-
-  @Override
-  public boolean isEmpty() {
-    return totalN == 0;
-  }
-
-  @Override
-  public DoublesSortedViewIterator iterator() {
-    return new DoublesSortedViewIterator(quantiles, cumWeights);
-  }
-
-  //restricted methods
-
-  private void populateFromSketch(final double[] srcQuantiles, final int[] srcLevels,
-    final int srcNumLevels, final int numItems) {
-    final int[] myLevels = new int[srcNumLevels + 1];
-    final int offset = srcLevels[0];
-    System.arraycopy(srcQuantiles, offset, quantiles, 0, numItems);
-    int srcLevel = 0;
-    int dstLevel = 0;
-    long weight = 1;
-    while (srcLevel < srcNumLevels) {
-      final int fromIndex = srcLevels[srcLevel] - offset;
-      final int toIndex = srcLevels[srcLevel + 1] - offset; // exclusive
-      if (fromIndex < toIndex) { // if equal, skip empty level
-        Arrays.fill(cumWeights, fromIndex, toIndex, weight);
-        myLevels[dstLevel] = fromIndex;
-        myLevels[dstLevel + 1] = toIndex;
-        dstLevel++;
-      }
-      srcLevel++;
-      weight *= 2;
-    }
-    final int numLevels = dstLevel;
-    blockyTandemMergeSort(quantiles, cumWeights, myLevels, numLevels); //create unit weights
-    KllHelper.convertToCumulative(cumWeights);
-  }
-
-  private static void blockyTandemMergeSort(final double[] quantiles, final long[] weights,
-      final int[] levels, final int numLevels) {
-    if (numLevels == 1) { return; }
-
-    // duplicate the input in preparation for the "ping-pong" copy reduction strategy.
-    final double[] quantilesTmp = Arrays.copyOf(quantiles, quantiles.length);
-    final long[] weightsTmp = Arrays.copyOf(weights, quantiles.length); // don't need the extra one
-
-    blockyTandemMergeSortRecursion(quantilesTmp, weightsTmp, quantiles, weights, levels, 0, numLevels);
-  }
-
-  private static void blockyTandemMergeSortRecursion(
-      final double[] quantilesSrc, final long[] weightsSrc,
-      final double[] quantilesDst, final long[] weightsDst,
-      final int[] levels, final int startingLevel, final int numLevels) {
-    if (numLevels == 1) { return; }
-    final int numLevels1 = numLevels / 2;
-    final int numLevels2 = numLevels - numLevels1;
-    assert numLevels1 >= 1;
-    assert numLevels2 >= numLevels1;
-    final int startingLevel1 = startingLevel;
-    final int startingLevel2 = startingLevel + numLevels1;
-    // swap roles of src and dst
-    blockyTandemMergeSortRecursion(
-        quantilesDst, weightsDst,
-        quantilesSrc, weightsSrc,
-        levels, startingLevel1, numLevels1);
-    blockyTandemMergeSortRecursion(
-        quantilesDst, weightsDst,
-        quantilesSrc, weightsSrc,
-        levels, startingLevel2, numLevels2);
-    tandemMerge(
-        quantilesSrc, weightsSrc,
-        quantilesDst, weightsDst,
-        levels,
-        startingLevel1, numLevels1,
-        startingLevel2, numLevels2);
-  }
-
-  private static void tandemMerge(
-      final double[] quantilesSrc, final long[] weightsSrc,
-      final double[] quantilesDst, final long[] weightsDst,
-      final int[] levelStarts,
-      final int startingLevel1, final int numLevels1,
-      final int startingLevel2, final int numLevels2) {
-    final int fromIndex1 = levelStarts[startingLevel1];
-    final int toIndex1 = levelStarts[startingLevel1 + numLevels1]; // exclusive
-    final int fromIndex2 = levelStarts[startingLevel2];
-    final int toIndex2 = levelStarts[startingLevel2 + numLevels2]; // exclusive
-    int iSrc1 = fromIndex1;
-    int iSrc2 = fromIndex2;
-    int iDst = fromIndex1;
-
-    while (iSrc1 < toIndex1 && iSrc2 < toIndex2) {
-      if (quantilesSrc[iSrc1] < quantilesSrc[iSrc2]) {
-        quantilesDst[iDst] = quantilesSrc[iSrc1];
-        weightsDst[iDst] = weightsSrc[iSrc1];
-        iSrc1++;
-      } else {
-        quantilesDst[iDst] = quantilesSrc[iSrc2];
-        weightsDst[iDst] = weightsSrc[iSrc2];
-        iSrc2++;
-      }
-      iDst++;
-    }
-    if (iSrc1 < toIndex1) {
-      System.arraycopy(quantilesSrc, iSrc1, quantilesDst, iDst, toIndex1 - iSrc1);
-      System.arraycopy(weightsSrc, iSrc1, weightsDst, iDst, toIndex1 - iSrc1);
-    } else if (iSrc2 < toIndex2) {
-      System.arraycopy(quantilesSrc, iSrc2, quantilesDst, iDst, toIndex2 - iSrc2);
-      System.arraycopy(weightsSrc, iSrc2, weightsDst, iDst, toIndex2 - iSrc2);
-    }
-  }
-
-}
diff --git a/src/main/java/org/apache/datasketches/kll/KllItemsSketch.java b/src/main/java/org/apache/datasketches/kll/KllItemsSketch.java
index 273a89b..9f5a5ae 100644
--- a/src/main/java/org/apache/datasketches/kll/KllItemsSketch.java
+++ b/src/main/java/org/apache/datasketches/kll/KllItemsSketch.java
@@ -242,7 +242,6 @@
   public final ItemsSketchSortedView<T> getSortedView() {
     if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
     return refreshSortedView();
-    //return itemsSV; //SpotBugs EI_EXPOSE_REP, Suppressed by FindBugsExcludeFilter
   }
 
   @Override
@@ -454,7 +453,7 @@
         blockyTandemMergeSort(quantiles, cumWeights, myLevels, numLevels, comparator); //create unit weights
         KllHelper.convertToCumulative(cumWeights);
       }
-  }
+  } //End of class CreateSortedView
 
     private static <T> void blockyTandemMergeSort(final Object[] quantiles, final long[] weights,
         final int[] levels, final int numLevels, final Comparator<? super T> comp) {
diff --git a/src/main/java/org/apache/datasketches/quantiles/CompactDoublesSketch.java b/src/main/java/org/apache/datasketches/quantiles/CompactDoublesSketch.java
index 81c9e6d..f6df9e8 100644
--- a/src/main/java/org/apache/datasketches/quantiles/CompactDoublesSketch.java
+++ b/src/main/java/org/apache/datasketches/quantiles/CompactDoublesSketch.java
@@ -23,7 +23,7 @@
 import org.apache.datasketches.memory.Memory;
 
 /**
- * Compact sketches are inherently <i>read ony</i>.
+ * Compact sketches are inherently <i>read only</i>.
  * @author Jon Malkin
  */
 public abstract class CompactDoublesSketch extends DoublesSketch {
diff --git a/src/main/java/org/apache/datasketches/quantiles/DirectDoublesSketchAccessor.java b/src/main/java/org/apache/datasketches/quantiles/DirectDoublesSketchAccessor.java
index 172ad14..b95c9f1 100644
--- a/src/main/java/org/apache/datasketches/quantiles/DirectDoublesSketchAccessor.java
+++ b/src/main/java/org/apache/datasketches/quantiles/DirectDoublesSketchAccessor.java
@@ -27,6 +27,7 @@
  * @author Jon Malkin
  */
 final class DirectDoublesSketchAccessor extends DoublesSketchAccessor {
+
   DirectDoublesSketchAccessor(final DoublesSketch ds,
                               final boolean forceSize,
                               final int level) {
diff --git a/src/main/java/org/apache/datasketches/quantiles/DirectUpdateDoublesSketch.java b/src/main/java/org/apache/datasketches/quantiles/DirectUpdateDoublesSketch.java
index 384b426..96c01d9 100644
--- a/src/main/java/org/apache/datasketches/quantiles/DirectUpdateDoublesSketch.java
+++ b/src/main/java/org/apache/datasketches/quantiles/DirectUpdateDoublesSketch.java
@@ -193,7 +193,7 @@
       //bit pattern on direct is always derived, no need to save it.
     }
     putN(newN);
-    classicQdsSV = null;
+    doublesSV = null;
   }
 
   @Override
diff --git a/src/main/java/org/apache/datasketches/quantiles/DoublesArrayAccessor.java b/src/main/java/org/apache/datasketches/quantiles/DoublesArrayAccessor.java
index dd740fb..7fcf38a 100644
--- a/src/main/java/org/apache/datasketches/quantiles/DoublesArrayAccessor.java
+++ b/src/main/java/org/apache/datasketches/quantiles/DoublesArrayAccessor.java
@@ -67,8 +67,7 @@
   }
 
   @Override
-  void putArray(final double[] srcArray, final int srcIndex,
-                       final int dstIndex, final int numItems) {
+  void putArray(final double[] srcArray, final int srcIndex, final int dstIndex, final int numItems) {
     System.arraycopy(srcArray, srcIndex, buffer_, dstIndex, numItems);
   }
 
diff --git a/src/main/java/org/apache/datasketches/quantiles/DoublesBufferAccessor.java b/src/main/java/org/apache/datasketches/quantiles/DoublesBufferAccessor.java
index c3b0f9b..1014a10 100644
--- a/src/main/java/org/apache/datasketches/quantiles/DoublesBufferAccessor.java
+++ b/src/main/java/org/apache/datasketches/quantiles/DoublesBufferAccessor.java
@@ -23,6 +23,7 @@
  * @author Jon Malkin
  */
 abstract class DoublesBufferAccessor {
+
   abstract double get(final int index);
 
   abstract double set(final int index, final double quantile);
@@ -31,6 +32,5 @@
 
   abstract double[] getArray(int fromIdx, int numItems);
 
-  abstract void putArray(double[] srcArray, int srcIndex,
-                         int dstIndex, int numItems);
+  abstract void putArray(double[] srcArray, int srcIndex, int dstIndex, int numItems);
 }
diff --git a/src/main/java/org/apache/datasketches/quantiles/DoublesSketch.java b/src/main/java/org/apache/datasketches/quantiles/DoublesSketch.java
index 93858c3..fd0698b 100644
--- a/src/main/java/org/apache/datasketches/quantiles/DoublesSketch.java
+++ b/src/main/java/org/apache/datasketches/quantiles/DoublesSketch.java
@@ -21,6 +21,7 @@
 
 import static java.lang.Math.max;
 import static java.lang.Math.min;
+import static java.lang.System.arraycopy;
 import static org.apache.datasketches.common.Util.ceilingPowerOf2;
 import static org.apache.datasketches.quantiles.ClassicUtil.MAX_PRELONGS;
 import static org.apache.datasketches.quantiles.ClassicUtil.MIN_K;
@@ -28,13 +29,16 @@
 import static org.apache.datasketches.quantiles.ClassicUtil.checkK;
 import static org.apache.datasketches.quantiles.ClassicUtil.computeNumLevelsNeeded;
 import static org.apache.datasketches.quantiles.ClassicUtil.computeRetainedItems;
+import static org.apache.datasketches.quantiles.DoublesSketchAccessor.BB_LVL_IDX;
 
+import java.util.Arrays;
 import java.util.Random;
 
 import org.apache.datasketches.common.SketchesArgumentException;
+import org.apache.datasketches.common.SketchesStateException;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.memory.WritableMemory;
-import org.apache.datasketches.quantilescommon.DoublesSortedView;
+import org.apache.datasketches.quantilescommon.DoublesSketchSortedView;
 import org.apache.datasketches.quantilescommon.QuantileSearchCriteria;
 import org.apache.datasketches.quantilescommon.QuantilesAPI;
 import org.apache.datasketches.quantilescommon.QuantilesDoublesAPI;
@@ -108,7 +112,7 @@
    */
   final int k_;
 
-  DoublesSketchSortedView classicQdsSV = null;
+  DoublesSketchSortedView doublesSV = null;
 
   DoublesSketch(final int k) {
     checkK(k);
@@ -160,7 +164,7 @@
   public double[] getCDF(final double[] splitPoints, final QuantileSearchCriteria searchCrit) {
   if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
     refreshSortedView();
-    return classicQdsSV.getCDF(splitPoints, searchCrit);
+    return doublesSV.getCDF(splitPoints, searchCrit);
   }
 
   @Override
@@ -173,14 +177,14 @@
   public double[] getPMF(final double[] splitPoints, final QuantileSearchCriteria searchCrit) {
   if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
     refreshSortedView();
-    return classicQdsSV.getPMF(splitPoints, searchCrit);
+    return doublesSV.getPMF(splitPoints, searchCrit);
   }
 
   @Override
   public double getQuantile(final double rank, final QuantileSearchCriteria searchCrit) {
   if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
     refreshSortedView();
-    return classicQdsSV.getQuantile(rank, searchCrit);
+    return doublesSV.getQuantile(rank, searchCrit);
   }
 
   @Override
@@ -190,7 +194,7 @@
     final int len = ranks.length;
     final double[] quantiles = new double[len];
     for (int i = 0; i < len; i++) {
-      quantiles[i] = classicQdsSV.getQuantile(ranks[i], searchCrit);
+      quantiles[i] = doublesSV.getQuantile(ranks[i], searchCrit);
     }
     return quantiles;
   }
@@ -219,7 +223,7 @@
   public double getRank(final double quantile, final QuantileSearchCriteria searchCrit) {
     if (isEmpty()) { throw new IllegalArgumentException(QuantilesAPI.EMPTY_MSG); }
     refreshSortedView();
-    return classicQdsSV.getRank(quantile, searchCrit);
+    return doublesSV.getRank(quantile, searchCrit);
   }
 
   /**
@@ -249,7 +253,7 @@
     final int len = quantiles.length;
     final double[] ranks = new double[len];
     for (int i = 0; i < len; i++) {
-      ranks[i] = classicQdsSV.getRank(quantiles[i], searchCrit);
+      ranks[i] = doublesSV.getRank(quantiles[i], searchCrit);
     }
     return ranks;
   }
@@ -508,11 +512,6 @@
     return new DoublesSketchIterator(this, getBitPattern());
   }
 
-  @Override
-  public DoublesSortedView getSortedView() {
-    return new DoublesSketchSortedView(this);
-  }
-
   /**
    * {@inheritDoc}
    * <p>The parameter <i>k</i> will not change.</p>
@@ -538,10 +537,6 @@
     return newSketch;
   }
 
-private final void refreshSortedView() {
-  classicQdsSV = (classicQdsSV == null) ? new DoublesSketchSortedView(this) : classicQdsSV;
-}
-
   //Restricted abstract
 
   /**
@@ -579,4 +574,225 @@
    * @return the Memory if it exists, otherwise returns null.
    */
   abstract WritableMemory getMemory();
+
+  //************SORTED VIEW****************************
+
+  @Override
+  public final DoublesSketchSortedView getSortedView() {
+    return refreshSortedView();
+  }
+
+  private final DoublesSketchSortedView refreshSortedView() {
+    return (doublesSV == null) ? (doublesSV = getSV()) : doublesSV;
+  }
+
+  private DoublesSketchSortedView getSV() {
+    final long totalN = getN();
+    if (isEmpty() || (totalN == 0)) { throw new SketchesArgumentException(EMPTY_MSG); }
+    final int numQuantiles = getNumRetained();
+    final double[] svQuantiles = new double[numQuantiles];
+    final long[] svCumWeights = new long[numQuantiles];
+    final DoublesSketchAccessor sketchAccessor = DoublesSketchAccessor.wrap(this);
+
+    // Populate from DoublesSketch:
+    //  copy over the "levels" and then the base buffer, all with appropriate weights
+    populateFromDoublesSketch(getK(), totalN, getBitPattern(), sketchAccessor, svQuantiles, svCumWeights);
+
+    // Sort the first "numSamples" slots of the two arrays in tandem,
+    // taking advantage of the already sorted blocks of length k
+    blockyTandemMergeSort(svQuantiles, svCumWeights, numQuantiles, getK());
+
+    if (convertToCumulative(svCumWeights) != totalN) {
+      throw new SketchesStateException("Sorted View is misconfigured. TotalN does not match cumWeights.");
+    }
+    return new DoublesSketchSortedView(svQuantiles, svCumWeights, totalN, getMaxItem(), getMinItem());
+  }
+
+  private final static void populateFromDoublesSketch(
+      final int k, final long totalN, final long bitPattern,
+      final DoublesSketchAccessor sketchAccessor,
+      final double[] svQuantiles, final long[] svCumWeights) {
+    long weight = 1;
+    int index = 0;
+    long bits = bitPattern;
+    assert bits == (totalN / (2L * k)); // internal consistency check
+    for (int lvl = 0; bits != 0L; lvl++, bits >>>= 1) {
+      weight <<= 1; // X2
+      if ((bits & 1L) > 0L) {
+        sketchAccessor.setLevel(lvl);
+        for (int i = 0; i < sketchAccessor.numItems(); i++) {
+          svQuantiles[index] = sketchAccessor.get(i);
+          svCumWeights[index] = weight;
+          index++;
+        }
+      }
+    }
+
+    weight = 1; //NOT a mistake! We just copied the highest level; now we need to copy the base buffer
+    final int startOfBaseBufferBlock = index;
+
+    // Copy BaseBuffer over, along with weight = 1
+    sketchAccessor.setLevel(BB_LVL_IDX);
+    for (int i = 0; i < sketchAccessor.numItems(); i++) {
+      svQuantiles[index] = sketchAccessor.get(i);
+      svCumWeights[index] = weight;
+      index++;
+    }
+    assert index == svQuantiles.length;
+
+    // Must sort the items that came from the base buffer.
+    // Don't need to sort the corresponding weights because they are all the same.
+    final int numSamples = index;
+    Arrays.sort(svQuantiles, startOfBaseBufferBlock, numSamples);
+  }
+
+  /**
+   * blockyTandemMergeSort() is an implementation of top-down merge sort specialized
+   * for the case where the input contains successive equal-length blocks
+   * that have already been sorted, so that only the top part of the
+   * merge tree remains to be executed. Also, two arrays are sorted in tandem,
+   * as discussed below.
+   * @param svQuantiles array of quantiles for sorted view
+   * @param svCumWts array for the cumulative weights (but not yet cumulative) for sorted view
+   * @param arrLen length of quantiles array and cumWts array
+   * @param blkSize size of internal sorted blocks, equal to k
+   */
+  //used by this and UtilTest
+  static void blockyTandemMergeSort(final double[] svQuantiles, final long[] svCumWts, final int arrLen,
+      final int blkSize) {
+    assert blkSize >= 1;
+    if (arrLen <= blkSize) { return; }
+    int numblks = arrLen / blkSize;
+    if ((numblks * blkSize) < arrLen) { numblks += 1; }
+    assert ((numblks * blkSize) >= arrLen);
+
+    // duplication of the input arrays is preparation for the "ping-pong" copy reduction strategy.
+    final double[] qSrc = Arrays.copyOf(svQuantiles, arrLen);
+    final long[] cwSrc   = Arrays.copyOf(svCumWts, arrLen);
+
+    blockyTandemMergeSortRecursion(qSrc, cwSrc,
+                                   svQuantiles, svCumWts,
+                                   0, numblks,
+                                   blkSize, arrLen);
+  }
+
+  /**
+   *  blockyTandemMergeSortRecursion() is called by blockyTandemMergeSort().
+   *  In addition to performing the algorithm's top down recursion,
+   *  it manages the buffer swapping that eliminates most copying.
+   *  It also maps the input's pre-sorted blocks into the subarrays
+   *  that are processed by tandemMerge().
+   * @param qSrc source array of quantiles
+   * @param cwSrc source weights array
+   * @param qDst destination quantiles array
+   * @param cwDst destination weights array
+   * @param grpStart group start, refers to pre-sorted blocks such as block 0, block 1, etc.
+   * @param grpLen group length, refers to pre-sorted blocks such as block 0, block 1, etc.
+   * @param blkSize block size
+   * @param arrLim array limit
+   */
+  private static void blockyTandemMergeSortRecursion(final double[] qSrc, final long[] cwSrc,
+      final double[] qDst, final long[] cwDst, final int grpStart, final int grpLen,
+      /* indices of blocks */ final int blkSize, final int arrLim) {
+    // Important note: grpStart and grpLen do NOT refer to positions in the underlying array.
+    // Instead, they refer to the pre-sorted blocks, such as block 0, block 1, etc.
+
+    assert (grpLen > 0);
+    if (grpLen == 1) { return; }
+    final int grpLen1 = grpLen / 2;
+    final int grpLen2 = grpLen - grpLen1;
+    assert (grpLen1 >= 1);
+    assert (grpLen2 >= grpLen1);
+
+    final int grpStart1 = grpStart;
+    final int grpStart2 = grpStart + grpLen1;
+
+    //swap roles of src and dst
+    blockyTandemMergeSortRecursion(qDst, cwDst,
+                           qSrc, cwSrc,
+                           grpStart1, grpLen1, blkSize, arrLim);
+
+    //swap roles of src and dst
+    blockyTandemMergeSortRecursion(qDst, cwDst,
+                           qSrc, cwSrc,
+                           grpStart2, grpLen2, blkSize, arrLim);
+
+    // here we convert indices of blocks into positions in the underlying array.
+    final int arrStart1 = grpStart1 * blkSize;
+    final int arrStart2 = grpStart2 * blkSize;
+    final int arrLen1   = grpLen1   * blkSize;
+    int arrLen2         = grpLen2   * blkSize;
+
+    // special case for the final block which might be shorter than blkSize.
+    if ((arrStart2 + arrLen2) > arrLim) { arrLen2 = arrLim - arrStart2; }
+
+    tandemMerge(qSrc, cwSrc,
+                arrStart1, arrLen1,
+                arrStart2, arrLen2,
+                qDst, cwDst,
+                arrStart1); // which will be arrStart3
+  }
+
+  /**
+   *  Performs two merges in tandem. One of them provides the sort keys
+   *  while the other one passively undergoes the same data motion.
+   * @param qSrc quantiles source
+   * @param cwSrc cumulative weights source
+   * @param arrStart1 Array 1 start offset
+   * @param arrLen1 Array 1 length
+   * @param arrStart2 Array 2 start offset
+   * @param arrLen2 Array 2 length
+   * @param qDst quantiles destination
+   * @param cwDst cumulative weights destination
+   * @param arrStart3 Array 3 start offset
+   */
+  private static void tandemMerge(final double[] qSrc, final long[] cwSrc,
+                                  final int arrStart1, final int arrLen1,
+                                  final int arrStart2, final int arrLen2,
+                                  final double[] qDst, final long[] cwDst,
+                                  final int arrStart3) {
+    final int arrStop1 = arrStart1 + arrLen1;
+    final int arrStop2 = arrStart2 + arrLen2;
+
+    int i1 = arrStart1;
+    int i2 = arrStart2;
+    int i3 = arrStart3;
+    while ((i1 < arrStop1) && (i2 < arrStop2)) {
+      if (qSrc[i2] < qSrc[i1]) {
+        qDst[i3] = qSrc[i2];
+        cwDst[i3] = cwSrc[i2];
+        i2++;
+      } else {
+        qDst[i3] = qSrc[i1];
+        cwDst[i3] = cwSrc[i1];
+        i1++;
+      }
+      i3++;
+    }
+
+    if (i1 < arrStop1) {
+      arraycopy(qSrc, i1, qDst, i3, arrStop1 - i1);
+      arraycopy(cwSrc, i1, cwDst, i3, arrStop1 - i1);
+    } else {
+      assert i2 < arrStop2;
+      arraycopy(qSrc, i2, qDst, i3, arrStop2 - i2);
+      arraycopy(cwSrc, i2, cwDst, i3, arrStop2 - i2);
+    }
+  }
+
+  /**
+   * Convert the individual weights into cumulative weights.
+   * An array of {1,1,1,1} becomes {1,2,3,4}
+   * @param array of actual weights from the sketch, none of the weights may be zero
+   * @return total weight
+   */
+  private static long convertToCumulative(final long[] array) {
+    long subtotal = 0;
+    for (int i = 0; i < array.length; i++) {
+      final long newSubtotal = subtotal + array[i];
+      subtotal = array[i] = newSubtotal;
+    }
+    return subtotal;
+  }
+
 }
diff --git a/src/main/java/org/apache/datasketches/quantiles/DoublesSketchSortedView.java b/src/main/java/org/apache/datasketches/quantiles/DoublesSketchSortedView.java
deleted file mode 100644
index bb68061..0000000
--- a/src/main/java/org/apache/datasketches/quantiles/DoublesSketchSortedView.java
+++ /dev/null
@@ -1,354 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.datasketches.quantiles;
-
-import static java.lang.System.arraycopy;
-import static org.apache.datasketches.quantiles.DoublesSketchAccessor.BB_LVL_IDX;
-import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.INCLUSIVE;
-import static org.apache.datasketches.quantilescommon.QuantilesAPI.EMPTY_MSG;
-import static org.apache.datasketches.quantilescommon.QuantilesUtil.getNaturalRank;
-
-import java.util.Arrays;
-
-import org.apache.datasketches.common.SketchesArgumentException;
-import org.apache.datasketches.common.SketchesStateException;
-import org.apache.datasketches.quantilescommon.DoublesSortedView;
-import org.apache.datasketches.quantilescommon.DoublesSortedViewIterator;
-import org.apache.datasketches.quantilescommon.InequalitySearch;
-import org.apache.datasketches.quantilescommon.QuantileSearchCriteria;
-import org.apache.datasketches.quantilescommon.QuantilesUtil;
-
-/**
- * The SortedView of the Classic Quantiles DoublesSketch.
- * @author Alexander Saydakov
- * @author Lee Rhodes
- */
-public final class DoublesSketchSortedView implements DoublesSortedView {
-  private final double[] quantiles;
-  private final long[] cumWeights; //comes in as individual weights, converted to cumulative natural weights
-  private final long totalN;
-  private final double maxItem;
-  private final double minItem;
-
-  /**
-   * Construct from elements, also used in testing.
-   * @param quantiles sorted array of quantiles
-   * @param cumWeights sorted, monotonically increasing cumulative weights.
-   * @param totalN the total number of items presented to the sketch.
-   * @param maxItem of type double
-   * @param minItem of type double
-   */
-  DoublesSketchSortedView(final double[] quantiles, final long[] cumWeights, final long totalN,
-      final double maxItem, final double minItem) {
-    this.quantiles = quantiles;
-    this.cumWeights  = cumWeights;
-    this.totalN = totalN;
-    this.maxItem = maxItem;
-    this.minItem = minItem;
-  }
-
-  /**
-   * Constructs this Sorted View given the sketch
-   * @param sketch the given Classic Quantiles DoublesSketch
-   */
-  public DoublesSketchSortedView(final DoublesSketch sketch) {
-    if (sketch.isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
-    this.totalN = sketch.getN();
-    this.maxItem = sketch.getMaxItem();
-    this.minItem = sketch.getMinItem();
-    final int k = sketch.getK();
-    final int numQuantiles = sketch.getNumRetained();
-    quantiles = new double[numQuantiles];
-    cumWeights = new long[numQuantiles];
-    final DoublesSketchAccessor sketchAccessor = DoublesSketchAccessor.wrap(sketch);
-
-    // Populate from DoublesSketch:
-    //  copy over the "levels" and then the base buffer, all with appropriate weights
-    populateFromDoublesSketch(k, totalN, sketch.getBitPattern(), sketchAccessor, quantiles, cumWeights);
-
-    // Sort the first "numSamples" slots of the two arrays in tandem,
-    //  taking advantage of the already sorted blocks of length k
-    blockyTandemMergeSort(quantiles, cumWeights, numQuantiles, k);
-   if (convertToCumulative(cumWeights) != totalN) {
-     throw new SketchesStateException("Sorted View is misconfigured. TotalN does not match cumWeights.");
-   }
-  }
-
-  @Override
-  public long[] getCumulativeWeights() {
-    return cumWeights.clone();
-  }
-
-  @Override
-  public double getMaxItem() {
-    return maxItem;
-  }
-
-  @Override
-  public double getMinItem() {
-    return minItem;
-  }
-
-  @Override
-  public long getN() {
-    return totalN;
-  }
-
-  @Override
-  public double getQuantile(final double rank, final QuantileSearchCriteria searchCrit) {
-    if (isEmpty()) { throw new IllegalArgumentException(EMPTY_MSG); }
-    QuantilesUtil.checkNormalizedRankBounds(rank);
-    final int len = cumWeights.length;
-    final double naturalRank = getNaturalRank(rank, totalN, searchCrit);
-    final InequalitySearch crit = (searchCrit == INCLUSIVE) ? InequalitySearch.GE : InequalitySearch.GT;
-    final int index = InequalitySearch.find(cumWeights, 0, len - 1, naturalRank, crit);
-    if (index == -1) {
-      return quantiles[len - 1]; //EXCLUSIVE (GT) case: normRank == 1.0;
-    }
-    return quantiles[index];
-  }
-
-  @Override
-  public double getRank(final double quantile, final QuantileSearchCriteria searchCrit) {
-    if (isEmpty()) { throw new IllegalArgumentException(EMPTY_MSG); }
-    final int len = quantiles.length;
-    final InequalitySearch crit = (searchCrit == INCLUSIVE) ? InequalitySearch.LE : InequalitySearch.LT;
-    final int index = InequalitySearch.find(quantiles,  0, len - 1, quantile, crit);
-    if (index == -1) {
-      return 0; //EXCLUSIVE (LT) case: quantile <= minQuantile; INCLUSIVE (LE) case: quantile < minQuantile
-    }
-    return (double)cumWeights[index] / totalN;
-  }
-
-  @Override
-  public double[] getQuantiles() {
-    return quantiles.clone();
-  }
-
-  @Override
-  public boolean isEmpty() {
-    return totalN == 0;
-  }
-
-  @Override
-  public DoublesSortedViewIterator iterator() {
-    return new DoublesSortedViewIterator(quantiles, cumWeights);
-  }
-
-  //restricted methods
-
-  /**
-   * Populate the arrays and registers from a DoublesSketch
-   * @param k K parameter of the sketch
-   * @param n The current size of the stream
-   * @param bitPattern the bit pattern for valid log levels
-   * @param sketchAccessor A DoublesSketchAccessor around the sketch
-   * @param quantilesArr the consolidated array of all items from the sketch
-   * @param cumWtsArr populates this array with the raw individual weights from the sketch,
-   * it will be cumulative later.
-   */
-  private final static void populateFromDoublesSketch(
-          final int k, final long n, final long bitPattern,
-          final DoublesSketchAccessor sketchAccessor,
-          final double[] quantilesArr, final long[] cumWtsArr) {
-    long weight = 1;
-    int nxt = 0;
-    long bits = bitPattern;
-    assert bits == (n / (2L * k)); // internal consistency check
-    for (int lvl = 0; bits != 0L; lvl++, bits >>>= 1) {
-      weight *= 2;
-      if ((bits & 1L) > 0L) {
-        sketchAccessor.setLevel(lvl);
-        for (int i = 0; i < sketchAccessor.numItems(); i++) {
-          quantilesArr[nxt] = sketchAccessor.get(i);
-          cumWtsArr[nxt] = weight;
-          nxt++;
-        }
-      }
-    }
-
-    weight = 1; //NOT a mistake! We just copied the highest level; now we need to copy the base buffer
-    final int startOfBaseBufferBlock = nxt;
-
-    // Copy BaseBuffer over, along with weight = 1
-    sketchAccessor.setLevel(BB_LVL_IDX);
-    for (int i = 0; i < sketchAccessor.numItems(); i++) {
-      quantilesArr[nxt] = sketchAccessor.get(i);
-      cumWtsArr[nxt] = weight;
-      nxt++;
-    }
-    assert nxt == quantilesArr.length;
-
-    // Must sort the items that came from the base buffer.
-    // Don't need to sort the corresponding weights because they are all the same.
-    final int numSamples = nxt;
-    Arrays.sort(quantilesArr, startOfBaseBufferBlock, numSamples);
-  }
-
-  /**
-   * blockyTandemMergeSort() is an implementation of top-down merge sort specialized
-   * for the case where the input contains successive equal-length blocks
-   * that have already been sorted, so that only the top part of the
-   * merge tree remains to be executed. Also, two arrays are sorted in tandem,
-   * as discussed below.
-   * @param quantiles array of quantiles
-   * @param cumWts array of cum weights
-   * @param arrLen length of quantiles array and cumWts array
-   * @param blkSize size of internal sorted blocks
-   */
-  //used by this and UtilTest
-  static void blockyTandemMergeSort(final double[] quantiles, final long[] cumWts, final int arrLen,
-      final int blkSize) {
-    assert blkSize >= 1;
-    if (arrLen <= blkSize) { return; }
-    int numblks = arrLen / blkSize;
-    if ((numblks * blkSize) < arrLen) { numblks += 1; }
-    assert ((numblks * blkSize) >= arrLen);
-
-    // duplication of the input arrays is preparation for the "ping-pong" copy reduction strategy.
-    final double[] qSrc = Arrays.copyOf(quantiles, arrLen);
-    final long[] cwSrc   = Arrays.copyOf(cumWts, arrLen);
-
-    blockyTandemMergeSortRecursion(qSrc, cwSrc,
-                                   quantiles, cumWts,
-                                   0, numblks,
-                                   blkSize, arrLen);
-  }
-
-  /**
-   *  blockyTandemMergeSortRecursion() is called by blockyTandemMergeSort().
-   *  In addition to performing the algorithm's top down recursion,
-   *  it manages the buffer swapping that eliminates most copying.
-   *  It also maps the input's pre-sorted blocks into the subarrays
-   *  that are processed by tandemMerge().
-   * @param qSrc source array of quantiles
-   * @param cwSrc source weights array
-   * @param qDst destination quantiles array
-   * @param cwDst destination weights array
-   * @param grpStart group start, refers to pre-sorted blocks such as block 0, block 1, etc.
-   * @param grpLen group length, refers to pre-sorted blocks such as block 0, block 1, etc.
-   * @param blkSize block size
-   * @param arrLim array limit
-   */
-  private static void blockyTandemMergeSortRecursion(final double[] qSrc, final long[] cwSrc,
-      final double[] qDst, final long[] cwDst, final int grpStart, final int grpLen,
-      /* indices of blocks */ final int blkSize, final int arrLim) {
-    // Important note: grpStart and grpLen do NOT refer to positions in the underlying array.
-    // Instead, they refer to the pre-sorted blocks, such as block 0, block 1, etc.
-
-    assert (grpLen > 0);
-    if (grpLen == 1) { return; }
-    final int grpLen1 = grpLen / 2;
-    final int grpLen2 = grpLen - grpLen1;
-    assert (grpLen1 >= 1);
-    assert (grpLen2 >= grpLen1);
-
-    final int grpStart1 = grpStart;
-    final int grpStart2 = grpStart + grpLen1;
-
-    //swap roles of src and dst
-    blockyTandemMergeSortRecursion(qDst, cwDst,
-                           qSrc, cwSrc,
-                           grpStart1, grpLen1, blkSize, arrLim);
-
-    //swap roles of src and dst
-    blockyTandemMergeSortRecursion(qDst, cwDst,
-                           qSrc, cwSrc,
-                           grpStart2, grpLen2, blkSize, arrLim);
-
-    // here we convert indices of blocks into positions in the underlying array.
-    final int arrStart1 = grpStart1 * blkSize;
-    final int arrStart2 = grpStart2 * blkSize;
-    final int arrLen1   = grpLen1   * blkSize;
-    int arrLen2         = grpLen2   * blkSize;
-
-    // special case for the final block which might be shorter than blkSize.
-    if ((arrStart2 + arrLen2) > arrLim) { arrLen2 = arrLim - arrStart2; }
-
-    tandemMerge(qSrc, cwSrc,
-                arrStart1, arrLen1,
-                arrStart2, arrLen2,
-                qDst, cwDst,
-                arrStart1); // which will be arrStart3
-  }
-
-  /**
-   *  Performs two merges in tandem. One of them provides the sort keys
-   *  while the other one passively undergoes the same data motion.
-   * @param qSrc quantiles source
-   * @param cwSrc cumulative weights source
-   * @param arrStart1 Array 1 start offset
-   * @param arrLen1 Array 1 length
-   * @param arrStart2 Array 2 start offset
-   * @param arrLen2 Array 2 length
-   * @param qDst quantiles destination
-   * @param cwDst cumulative weights destination
-   * @param arrStart3 Array 3 start offset
-   */
-  private static void tandemMerge(final double[] qSrc, final long[] cwSrc,
-                                  final int arrStart1, final int arrLen1,
-                                  final int arrStart2, final int arrLen2,
-                                  final double[] qDst, final long[] cwDst,
-                                  final int arrStart3) {
-    final int arrStop1 = arrStart1 + arrLen1;
-    final int arrStop2 = arrStart2 + arrLen2;
-
-    int i1 = arrStart1;
-    int i2 = arrStart2;
-    int i3 = arrStart3;
-    while ((i1 < arrStop1) && (i2 < arrStop2)) {
-      if (qSrc[i2] < qSrc[i1]) {
-        qDst[i3] = qSrc[i2];
-        cwDst[i3] = cwSrc[i2];
-        i2++;
-      } else {
-        qDst[i3] = qSrc[i1];
-        cwDst[i3] = cwSrc[i1];
-        i1++;
-      }
-      i3++;
-    }
-
-    if (i1 < arrStop1) {
-      arraycopy(qSrc, i1, qDst, i3, arrStop1 - i1);
-      arraycopy(cwSrc, i1, cwDst, i3, arrStop1 - i1);
-    } else {
-      assert i2 < arrStop2;
-      arraycopy(qSrc, i2, qDst, i3, arrStop2 - i2);
-      arraycopy(cwSrc, i2, cwDst, i3, arrStop2 - i2);
-    }
-  }
-
-  /**
-   * Convert the individual weights into cumulative weights.
-   * An array of {1,1,1,1} becomes {1,2,3,4}
-   * @param array of actual weights from the sketch, none of the weights may be zero
-   * @return total weight
-   */
-  private static long convertToCumulative(final long[] array) {
-    long subtotal = 0;
-    for (int i = 0; i < array.length; i++) {
-      final long newSubtotal = subtotal + array[i];
-      subtotal = array[i] = newSubtotal;
-    }
-    return subtotal;
-  }
-
-}
diff --git a/src/main/java/org/apache/datasketches/quantiles/DoublesUnionImpl.java b/src/main/java/org/apache/datasketches/quantiles/DoublesUnionImpl.java
index b55f0b5..38854d3 100644
--- a/src/main/java/org/apache/datasketches/quantiles/DoublesUnionImpl.java
+++ b/src/main/java/org/apache/datasketches/quantiles/DoublesUnionImpl.java
@@ -123,14 +123,14 @@
   public void union(final DoublesSketch sketchIn) {
     Objects.requireNonNull(sketchIn);
     gadget_ = updateLogic(maxK_, gadget_, sketchIn);
-    gadget_.classicQdsSV = null;
+    gadget_.doublesSV = null;
   }
 
   @Override
   public void union(final Memory mem) {
     Objects.requireNonNull(mem);
     gadget_ = updateLogic(maxK_, gadget_, DoublesSketch.wrap(mem));
-    gadget_.classicQdsSV = null;
+    gadget_.doublesSV = null;
   }
 
   @Override
@@ -139,7 +139,7 @@
       gadget_ = HeapUpdateDoublesSketch.newInstance(maxK_);
     }
     gadget_.update(quantile);
-    gadget_.classicQdsSV = null;
+    gadget_.doublesSV = null;
   }
 
   @Override
diff --git a/src/main/java/org/apache/datasketches/quantiles/HeapUpdateDoublesSketch.java b/src/main/java/org/apache/datasketches/quantiles/HeapUpdateDoublesSketch.java
index 8282f00..2197d8b 100644
--- a/src/main/java/org/apache/datasketches/quantiles/HeapUpdateDoublesSketch.java
+++ b/src/main/java/org/apache/datasketches/quantiles/HeapUpdateDoublesSketch.java
@@ -270,7 +270,7 @@
       baseBufferCount_ = newBBCount;
     }
     n_ = newN;
-    classicQdsSV = null;
+    doublesSV = null;
   }
 
   /**
diff --git a/src/main/java/org/apache/datasketches/quantiles/ItemsSketch.java b/src/main/java/org/apache/datasketches/quantiles/ItemsSketch.java
index 54795d7..13aee81 100644
--- a/src/main/java/org/apache/datasketches/quantiles/ItemsSketch.java
+++ b/src/main/java/org/apache/datasketches/quantiles/ItemsSketch.java
@@ -531,12 +531,6 @@
   }
 
   @Override
-  public ItemsSketchSortedView<T> getSortedView() {
-    if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
-    return refreshSortedView();
-  }
-
-  @Override
   public void update(final T item) {
     // this method only uses the base buffer part of the combined buffer
 
@@ -631,69 +625,84 @@
     sketch.combinedBuffer_ = Arrays.copyOf(baseBuffer, newSize);
   }
 
-  private final ItemsSketchSortedView<T> refreshSortedView() {
-    if (classicQisSV == null) {
-      final CreateSortedView csv = new CreateSortedView();
-      classicQisSV = csv.getSV();
-    }
-    return classicQisSV;
+  //************SORTED VIEW****************************
+
+  @Override
+  public ItemsSketchSortedView<T> getSortedView() {
+    return refreshSortedView();
   }
 
-  @SuppressWarnings({"rawtypes","unchecked"})
-  private final class CreateSortedView {
-    final long n = getN();
-    final int numQuantiles = getNumRetained();
-    final T[] quantiles = (T[]) Array.newInstance(clazz, numQuantiles);
-    long[] cumWeights = new long[numQuantiles];
-    final int k = getK();
+  private final ItemsSketchSortedView<T> refreshSortedView() {
+    return (classicQisSV == null) ? (classicQisSV = getSV(this)) : classicQisSV;
+  }
 
-    final T[] combinedBuffer = (T[]) getCombinedBuffer();
-    final int baseBufferCount = getBaseBufferCount();
-    final Comparator<? super T> comparator = ItemsSketch.this.comparator_;
+  @SuppressWarnings({"unchecked"})
+  private static <T> ItemsSketchSortedView<T> getSV(final ItemsSketch<T> sk) {
+    final long totalN = sk.getN();
+    if (sk.isEmpty() || (totalN == 0)) { throw new SketchesArgumentException(EMPTY_MSG); }
+    final int k = sk.getK();
+    final int numQuantiles = sk.getNumRetained();
+    final T[] svQuantiles = (T[]) Array.newInstance(sk.clazz, numQuantiles);
+    final long[] svCumWeights = new long[numQuantiles];
+    final Comparator<? super T> comparator = sk.comparator_;
 
-    ItemsSketchSortedView<T> getSV() {
-      long weight = 1;
-      int index = 0;
-      long bits = getBitPattern();
-      assert bits == (n / (2L * k)); // internal consistency check
-      for (int lvl = 0; bits != 0L; lvl++, bits >>>= 1) {
-        weight *= 2;
-        if ((bits & 1L) > 0L) {
-          final int offset = (2 + lvl) * k;
-          for (int i = 0; i < k; i++) {
-            quantiles[index] = combinedBuffer[i + offset];
-            cumWeights[index] = weight;
-            index++;
-          }
+    final T[] combinedBuffer = (T[]) sk.getCombinedBuffer();
+    final int baseBufferCount = sk.getBaseBufferCount();
+
+    // Populate from ItemsSketch:
+    // copy over the "levels" and then the base buffer, all with appropriate weights
+    populateFromItemsSketch(k, totalN, sk.getBitPattern(), combinedBuffer, baseBufferCount,
+        numQuantiles, svQuantiles, svCumWeights, sk.getComparator());
+
+    // Sort the first "numSamples" slots of the two arrays in tandem,
+    // taking advantage of the already sorted blocks of length k
+    ItemsMergeImpl.blockyTandemMergeSort(svQuantiles, svCumWeights, numQuantiles, k, comparator);
+
+    if (convertToCumulative(svCumWeights) != totalN) {
+      throw new SketchesStateException("Sorted View is misconfigured. TotalN does not match cumWeights.");
+    }
+
+    final double normRankErr = getNormalizedRankError(sk.getK(), true);
+    return new ItemsSketchSortedView<>(
+        svQuantiles, svCumWeights, sk.getN(), comparator, sk.getMaxItem(), sk.getMinItem(), normRankErr);
+
+  }
+
+  private final static <T> void populateFromItemsSketch(
+      final int k, final long totalN, final long bitPattern, final T[] combinedBuffer,
+      final int baseBufferCount, final int numQuantiles, final T[] svQuantiles, final long[] svCumWeights,
+      final Comparator<? super T> comparator) {
+
+    long weight = 1;
+    int index = 0;
+    long bits = bitPattern;
+    assert bits == (totalN / (2L * k)); // internal consistency check
+    for (int lvl = 0; bits != 0L; lvl++, bits >>>= 1) {
+      weight <<= 1; // X2
+      if ((bits & 1L) > 0L) {
+        final int offset = (2 + lvl) * k;
+        for (int i = 0; i < k; i++) {
+          svQuantiles[index] = combinedBuffer[i + offset];
+          svCumWeights[index] = weight;
+          index++;
         }
       }
-
-      weight = 1; //NOT a mistake! We just copied the highest level; now we need to copy the base buffer
-      final int startOfBaseBufferBlock = index;
-
-      // Copy BaseBuffer over, along with weight = 1
-      for (int i = 0; i < baseBufferCount; i++) {
-        quantiles[index] = combinedBuffer[i];
-        cumWeights[index] = weight;
-        index++;
-      }
-      assert index == numQuantiles;
-
-      // Must sort the items that came from the base buffer.
-      // Don't need to sort the corresponding weights because they are all the same.
-      Arrays.sort(quantiles, startOfBaseBufferBlock, numQuantiles, comparator);
-
-      // Sort the first "numSamples" slots of the two arrays in tandem,
-      // taking advantage of the already sorted blocks of length k
-      ItemsMergeImpl.blockyTandemMergeSort(quantiles, cumWeights, numQuantiles, k, comparator);
-
-      if (convertToCumulative(cumWeights) != n) {
-        throw new SketchesStateException("Sorted View is misconfigured. TotalN does not match cumWeights.");
-      }
-      final double normRankErr = getNormalizedRankError(getK(), true);
-      return new ItemsSketchSortedView(
-          quantiles, cumWeights, getN(), comparator, getMaxItem(), getMinItem(), normRankErr);
     }
+
+    weight = 1; //NOT a mistake! We just copied the highest level; now we need to copy the base buffer
+    final int startOfBaseBufferBlock = index;
+
+    // Copy BaseBuffer over, along with weight = 1
+    for (int i = 0; i < baseBufferCount; i++) {
+      svQuantiles[index] = combinedBuffer[i];
+      svCumWeights[index] = weight;
+      index++;
+    }
+    assert index == numQuantiles;
+
+    // Must sort the items that came from the base buffer.
+    // Don't need to sort the corresponding weights because they are all the same.
+    Arrays.sort(svQuantiles, startOfBaseBufferBlock, numQuantiles, comparator);
   }
 
   /**
diff --git a/src/main/java/org/apache/datasketches/quantiles/KolmogorovSmirnov.java b/src/main/java/org/apache/datasketches/quantiles/KolmogorovSmirnov.java
index c290b05..6b8079e 100644
--- a/src/main/java/org/apache/datasketches/quantiles/KolmogorovSmirnov.java
+++ b/src/main/java/org/apache/datasketches/quantiles/KolmogorovSmirnov.java
@@ -19,6 +19,8 @@
 
 package org.apache.datasketches.quantiles;
 
+import org.apache.datasketches.quantilescommon.DoublesSketchSortedView;
+
 /**
  * Kolmogorov-Smirnov Test
  * See <a href="https://en.wikipedia.org/wiki/Kolmogorov-Smirnov_test">Kolmogorov–Smirnov Test</a>
@@ -36,8 +38,8 @@
    * @return the raw delta area between two quantile sketches
    */
   public static double computeKSDelta(final DoublesSketch sketch1, final DoublesSketch sketch2) {
-    final DoublesSketchSortedView p = new DoublesSketchSortedView(sketch1);
-    final DoublesSketchSortedView q = new DoublesSketchSortedView(sketch2);
+    final DoublesSketchSortedView p = sketch1.getSortedView();
+    final DoublesSketchSortedView q = sketch2.getSortedView();
 
     final double[] pSamplesArr = p.getQuantiles();
     final double[] qSamplesArr = q.getQuantiles();
diff --git a/src/main/java/org/apache/datasketches/quantilescommon/DoublesSketchSortedView.java b/src/main/java/org/apache/datasketches/quantilescommon/DoublesSketchSortedView.java
new file mode 100644
index 0000000..b033236
--- /dev/null
+++ b/src/main/java/org/apache/datasketches/quantilescommon/DoublesSketchSortedView.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.datasketches.quantilescommon;
+
+import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.INCLUSIVE;
+import static org.apache.datasketches.quantilescommon.QuantilesAPI.EMPTY_MSG;
+import static org.apache.datasketches.quantilescommon.QuantilesUtil.getNaturalRank;
+
+import org.apache.datasketches.common.SketchesArgumentException;
+
+/**
+ * The SortedView of the Quantiles Classic DoublesSketch and the KllDoublesSketch.
+ * @author Alexander Saydakov
+ * @author Lee Rhodes
+ */
+public final class DoublesSketchSortedView implements DoublesSortedView {
+  private final double[] quantiles;
+  private final long[] cumWeights; //comes in as individual weights, converted to cumulative natural weights
+  private final long totalN;
+  private final double maxItem;
+  private final double minItem;
+
+  /**
+   * Construct from elements, also used in testing.
+   * @param quantiles sorted array of quantiles
+   * @param cumWeights sorted, monotonically increasing cumulative weights.
+   * @param totalN the total number of items presented to the sketch.
+   * @param maxItem of type double
+   * @param minItem of type double
+   */
+  public DoublesSketchSortedView(final double[] quantiles, final long[] cumWeights, final long totalN,
+      final double maxItem, final double minItem) {
+    this.quantiles = quantiles;
+    this.cumWeights  = cumWeights;
+    this.totalN = totalN;
+    this.maxItem = maxItem;
+    this.minItem = minItem;
+  }
+
+  @Override
+  public long[] getCumulativeWeights() {
+    return cumWeights.clone();
+  }
+
+  @Override
+  public double getMaxItem() {
+    return maxItem;
+  }
+
+  @Override
+  public double getMinItem() {
+    return minItem;
+  }
+
+  @Override
+  public long getN() {
+    return totalN;
+  }
+
+  @Override
+  public double getQuantile(final double rank, final QuantileSearchCriteria searchCrit) {
+    if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
+    QuantilesUtil.checkNormalizedRankBounds(rank);
+    final int len = cumWeights.length;
+    final double naturalRank = getNaturalRank(rank, totalN, searchCrit);
+    final InequalitySearch crit = (searchCrit == INCLUSIVE) ? InequalitySearch.GE : InequalitySearch.GT;
+    final int index = InequalitySearch.find(cumWeights, 0, len - 1, naturalRank, crit);
+    if (index == -1) {
+      return quantiles[len - 1]; //EXCLUSIVE (GT) case: normRank == 1.0;
+    }
+    return quantiles[index];
+  }
+
+  @Override
+  public double[] getQuantiles() {
+    return quantiles.clone();
+  }
+
+  @Override
+  public double getRank(final double quantile, final QuantileSearchCriteria searchCrit) {
+    if (isEmpty()) { throw new SketchesArgumentException(EMPTY_MSG); }
+    final int len = quantiles.length;
+    final InequalitySearch crit = (searchCrit == INCLUSIVE) ? InequalitySearch.LE : InequalitySearch.LT;
+    final int index = InequalitySearch.find(quantiles,  0, len - 1, quantile, crit);
+    if (index == -1) {
+      return 0; //EXCLUSIVE (LT) case: quantile <= minQuantile; INCLUSIVE (LE) case: quantile < minQuantile
+    }
+    return (double)cumWeights[index] / totalN;
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return totalN == 0;
+  }
+
+  @Override
+  public DoublesSortedViewIterator iterator() {
+    return new DoublesSortedViewIterator(quantiles, cumWeights);
+  }
+
+}
diff --git a/src/main/java/org/apache/datasketches/tdigest/TDigestDouble.java b/src/main/java/org/apache/datasketches/tdigest/TDigestDouble.java
index 9a93b1c..b1a22e3 100644
--- a/src/main/java/org/apache/datasketches/tdigest/TDigestDouble.java
+++ b/src/main/java/org/apache/datasketches/tdigest/TDigestDouble.java
@@ -20,7 +20,7 @@
 package org.apache.datasketches.tdigest;
 
 import java.nio.ByteOrder;
-import java.util.function.Function;
+import java.util.Arrays;
 
 import org.apache.datasketches.common.Family;
 import org.apache.datasketches.common.SketchesArgumentException;
@@ -47,7 +47,6 @@
 
   private boolean reverseMerge_;
   private final short k_;
-  private final short internalK_;
   private double minValue_;
   private double maxValue_;
   private int centroidsCapacity_;
@@ -55,11 +54,10 @@
   private double[] centroidMeans_;
   private long[] centroidWeights_;
   private long centroidsWeight_;
-  private int bufferCapacity_;
   private int numBuffered_;
   private double[] bufferValues_;
-  private long[] bufferWeights_;
-  private long bufferedWeight_;
+
+  private static final int BUFFER_MULTIPLIER = 4;
 
   private static final byte PREAMBLE_LONGS_EMPTY_OR_SINGLE = 1;
   private static final byte PREAMBLE_LONGS_MULTIPLE = 2;
@@ -82,7 +80,7 @@
    * @param k affects the size of TDigest and its estimation error
    */
   public TDigestDouble(final short k) {
-    this(false, k, Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY, null, null, 0);
+    this(false, k, Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY, null, null, 0, null);
   }
 
   /**
@@ -98,11 +96,9 @@
    */
   public void update(final double value) {
     if (Double.isNaN(value)) { return; }
-    if (numBuffered_ == bufferCapacity_ - numCentroids_) { mergeBuffered(); }
+    if (numBuffered_ == centroidsCapacity_ * BUFFER_MULTIPLIER) { compress(); }
     bufferValues_[numBuffered_] = value;
-    bufferWeights_[numBuffered_] = 1;
     numBuffered_++;
-    bufferedWeight_++;
     minValue_ = Math.min(minValue_, value);
     maxValue_ = Math.max(maxValue_, value);
   }
@@ -114,36 +110,28 @@
   public void merge(final TDigestDouble other) {
     if (other.isEmpty()) { return; }
     final int num = numCentroids_ + numBuffered_ + other.numCentroids_ + other.numBuffered_;
-    if (num <= bufferCapacity_) {
-      System.arraycopy(other.bufferValues_, 0, bufferValues_, numBuffered_, other.numBuffered_);
-      System.arraycopy(other.bufferWeights_, 0, bufferWeights_, numBuffered_, other.numBuffered_);
-      numBuffered_ += other.numBuffered_;
-      System.arraycopy(other.centroidMeans_, 0, bufferValues_, numBuffered_, other.numCentroids_);
-      System.arraycopy(other.centroidWeights_, 0, bufferWeights_, numBuffered_, other.numCentroids_);
-      numBuffered_ += other.numCentroids_;
-      bufferedWeight_ += other.getTotalWeight();
-      minValue_ = Math.min(minValue_, other.minValue_);
-      maxValue_ = Math.max(maxValue_, other.maxValue_);
-    } else {
-      final double[] values = new double[num];
-      final long[] weights = new long[num];
-      System.arraycopy(bufferValues_, 0, values, 0, numBuffered_);
-      System.arraycopy(bufferWeights_, 0, weights, 0, numBuffered_);
-      System.arraycopy(other.bufferValues_, 0, values, numBuffered_, other.numBuffered_);
-      System.arraycopy(other.bufferWeights_, 0, weights, numBuffered_, other.numBuffered_);
-      numBuffered_ += other.numBuffered_;
-      System.arraycopy(other.centroidMeans_, 0, values, numBuffered_, other.numCentroids_);
-      System.arraycopy(other.centroidWeights_, 0, weights, numBuffered_, other.numCentroids_);
-      numBuffered_ += other.numCentroids_;
-      merge(values, weights, bufferedWeight_ + other.getTotalWeight(), numBuffered_);
-    }
+    final double[] values = new double[num];
+    final long[] weights = new long[num];
+    System.arraycopy(bufferValues_, 0, values, 0, numBuffered_);
+    Arrays.fill(weights, 0, numBuffered_, 1);
+    System.arraycopy(other.bufferValues_, 0, values, numBuffered_, other.numBuffered_);
+    Arrays.fill(weights, numBuffered_, numBuffered_ + other.numBuffered_, 1);
+    System.arraycopy(other.centroidMeans_, 0, values, numBuffered_ + other.numBuffered_, other.numCentroids_);
+    System.arraycopy(other.centroidWeights_, 0, weights, numBuffered_ + other.numBuffered_, other.numCentroids_);
+    merge(values, weights, numBuffered_ + other.getTotalWeight(), numBuffered_ + other.numBuffered_ + other.numCentroids_);
   }
 
   /**
    * Process buffered values and merge centroids if needed
    */
   public void compress() {
-    mergeBuffered();
+    if (numBuffered_ == 0) { return; }
+    final int num = numBuffered_ + numCentroids_;
+    final double[] values =  new double[num];
+    final long[] weights = new long[num];
+    System.arraycopy(bufferValues_, 0, values, 0, numBuffered_);
+    Arrays.fill(weights, 0, numBuffered_, 1);
+    merge(values, weights, numBuffered_, numBuffered_);
   }
 
   /**
@@ -173,7 +161,7 @@
    * @return total weight
    */
   public long getTotalWeight() {
-    return centroidsWeight_ + bufferedWeight_;
+    return centroidsWeight_ + numBuffered_;
   }
 
   /**
@@ -188,7 +176,7 @@
     if (value > maxValue_) { return 1; }
     if (numCentroids_ + numBuffered_ == 1) { return 0.5; }
 
-    mergeBuffered(); // side effect
+    compress(); // side effect
 
     // left tail
     final double firstMean = centroidMeans_[0];
@@ -244,7 +232,7 @@
     if (Double.isNaN(rank)) { throw new SketchesArgumentException("Operation is undefined for Nan"); }
     if (rank < 0 || rank > 1) { throw new SketchesArgumentException("Normalized rank must be within [0, 1]"); } 
     
-    mergeBuffered(); // side effect
+    compress(); // side effect
 
     if (numCentroids_ == 1) { return centroidMeans_[0]; }
 
@@ -293,7 +281,7 @@
    * @return size in bytes needed to serialize this tdigest
    */
   int getSerializedSizeBytes() {
-    mergeBuffered(); // side effect
+    compress(); // side effect
     return getPreambleLongs() * Long.BYTES
     + (isEmpty() ? 0 : (isSingleValue() ? Double.BYTES : 2 * Double.BYTES + (Double.BYTES + Long.BYTES) * numCentroids_));
   }
@@ -303,7 +291,7 @@
    * @return byte array
    */
   public byte[] toByteArray() {
-    mergeBuffered(); // side effect
+    compress(); // side effect
     final byte[] bytes = new byte[getSerializedSizeBytes()];
     final WritableBuffer wbuf = WritableMemory.writableWrap(bytes).asWritableBuffer();
     wbuf.putByte((byte) getPreambleLongs());
@@ -380,7 +368,7 @@
       } else {
         value = buff.getDouble();
       }
-      return new TDigestDouble(reverseMerge, k, value, value, new double[] {value}, new long[] {1}, 1);
+      return new TDigestDouble(reverseMerge, k, value, value, new double[] {value}, new long[] {1}, 1, null);
     }
     final int numCentroids = buff.getInt();
     buff.getInt(); // unused
@@ -401,7 +389,7 @@
       weights[i] = isFloat ? buff.getInt() : buff.getLong();
       totalWeight += weights[i];
     }
-    return new TDigestDouble(reverseMerge, k, min, max, means, weights, totalWeight);
+    return new TDigestDouble(reverseMerge, k, min, max, means, weights, totalWeight, null);
   }
 
   // compatibility with the format of the reference implementation
@@ -425,7 +413,7 @@
         means[i] = buff.getDouble();
         totalWeight += weights[i];
       }
-      return new TDigestDouble(false, k, min, max, means, weights, totalWeight);
+      return new TDigestDouble(false, k, min, max, means, weights, totalWeight, null);
     }
     // COMPAT_FLOAT: compatibility with asSmallBytes()
     final double min = buff.getDouble(); // reference implementation uses doubles for min and max
@@ -443,7 +431,7 @@
       means[i] = buff.getFloat();
       totalWeight += weights[i];
     }
-    return new TDigestDouble(false, k, min, max, means, weights, totalWeight);
+    return new TDigestDouble(false, k, min, max, means, weights, totalWeight, null);
   }
 
   /**
@@ -464,14 +452,12 @@
     final StringBuilder sb = new StringBuilder();
 
     sb.append("MergingDigest").append(LS)
-      .append(" Nominal Compression: ").append(k_).append(LS)
-      .append(" Internal Compression: ").append(internalK_).append(LS)
+      .append(" Compression: ").append(k_).append(LS)
       .append(" Centroids: ").append(numCentroids_).append(LS)
       .append(" Buffered: ").append(numBuffered_).append(LS)
       .append(" Centroids Capacity: ").append(centroidsCapacity_).append(LS)
-      .append(" Buffer Capacity: ").append(bufferCapacity_).append(LS)
+      .append(" Buffer Capacity: ").append(centroidsCapacity_ * BUFFER_MULTIPLIER).append(LS)
       .append("Centroids Weight: ").append(centroidsWeight_).append(LS)
-      .append(" Buffered Weight: ").append(bufferedWeight_).append(LS)
       .append(" Total Weight: ").append(getTotalWeight()).append(LS)
       .append(" Reverse Merge: ").append(reverseMerge_).append(LS);
     if (!isEmpty()) {
@@ -488,7 +474,7 @@
       if (numBuffered_ > 0) {
         sb.append("Buffer:").append(LS);
         for (int i = 0; i < numBuffered_; i++) {
-          sb.append(i).append(": ").append(bufferValues_[i]).append(", ").append(bufferWeights_[i]).append(LS);
+          sb.append(i).append(": ").append(bufferValues_[i]).append(LS);
         }
       }
     }
@@ -496,37 +482,29 @@
   }
 
   private TDigestDouble(final boolean reverseMerge, final short k, final double min, final double max,
-      final double[] means, final long[] weights, final long weight) {
-    reverseMerge_ = reverseMerge; 
+      final double[] means, final long[] weights, final long weight, final double[] buffer) {
+    reverseMerge_ = reverseMerge;
     k_ = k;
     minValue_ = min;
     maxValue_ = max;
     if (k < 10) { throw new SketchesArgumentException("k must be at least 10"); }
     final int fudge = k < 30 ? 30 : 10;
     centroidsCapacity_ = k_ * 2 + fudge;
-    bufferCapacity_ = centroidsCapacity_ * 5;
-    final double scale = Math.max(1.0, (double) bufferCapacity_ / centroidsCapacity_ - 1.0);
-    internalK_ = (short) Math.ceil(Math.sqrt(scale) * k_);
-    centroidsCapacity_ = Math.max(centroidsCapacity_, internalK_ + fudge);
-    bufferCapacity_ = Math.max(bufferCapacity_, centroidsCapacity_ * 2);
     centroidMeans_ = new double[centroidsCapacity_];
     centroidWeights_ = new long[centroidsCapacity_];
-    bufferValues_ =  new double[bufferCapacity_];
-    bufferWeights_ = new long[bufferCapacity_];
+    bufferValues_ =  new double[centroidsCapacity_ * BUFFER_MULTIPLIER];
     numCentroids_ = 0;
     numBuffered_ = 0;
     centroidsWeight_ = weight;
-    bufferedWeight_ = 0;
     if (means != null && weights != null) {
       System.arraycopy(means, 0, centroidMeans_, 0, means.length);
       System.arraycopy(weights, 0, centroidWeights_, 0, weights.length);
       numCentroids_ = means.length;
     }
-  }
-
-  private void mergeBuffered() {
-    if (numBuffered_ == 0) { return; }
-    merge(bufferValues_, bufferWeights_, bufferedWeight_, numBuffered_);
+    if (buffer != null) {
+      System.arraycopy(buffer, 0, bufferValues_, 0, buffer.length);
+      numBuffered_ = buffer.length;
+    }
   }
 
   // assumes that there is enough room in the input arrays to add centroids from this TDigest
@@ -552,7 +530,7 @@
       if (current != 1 && current != num - 1) {
         final double q0 = weightSoFar / centroidsWeight_;
         final double q2 = (weightSoFar + proposedWeight) / centroidsWeight_;
-        final double normalizer = ScaleFunction.normalizer(internalK_, centroidsWeight_);
+        final double normalizer = ScaleFunction.normalizer(k_ * 2, centroidsWeight_);
         addThis = proposedWeight <= centroidsWeight_ * Math.min(ScaleFunction.max(q0, normalizer), ScaleFunction.max(q2, normalizer));
       }
       if (addThis) { // merge into existing centroid
@@ -572,7 +550,6 @@
       Sort.reverse(centroidWeights_, numCentroids_);
     }
     numBuffered_ = 0;
-    bufferedWeight_ = 0;
     reverseMerge_ = !reverseMerge_;
     minValue_ = Math.min(minValue_, centroidMeans_[0]);
     maxValue_ = Math.max(maxValue_, centroidMeans_[numCentroids_ - 1]);
@@ -604,7 +581,7 @@
       return 4 * Math.log(n / compression) + 24;
     }
   }
-  
+
   private static double weightedAverage(final double x1, final double w1, final double x2, final double w2) {
     return (x1 * w1 + x2 * w2) / (w1 + w2);
   }
diff --git a/src/test/java/org/apache/datasketches/quantiles/CustomQuantilesTest.java b/src/test/java/org/apache/datasketches/quantiles/CustomQuantilesTest.java
index d319388..101c26c 100644
--- a/src/test/java/org/apache/datasketches/quantiles/CustomQuantilesTest.java
+++ b/src/test/java/org/apache/datasketches/quantiles/CustomQuantilesTest.java
@@ -26,6 +26,7 @@
 import static org.apache.datasketches.quantilescommon.QuantilesUtil.getNaturalRank;
 import static org.testng.Assert.assertEquals;
 
+import org.apache.datasketches.quantilescommon.DoublesSketchSortedView;
 import org.testng.annotations.Test;
 
 public class CustomQuantilesTest {
@@ -55,7 +56,7 @@
       }
     }
     long N = sk.getN();
-    DoublesSketchSortedView sv = new DoublesSketchSortedView(sk);
+    DoublesSketchSortedView sv = sk.getSortedView();
     double[] quantilesArr = sv.getQuantiles();
     long[] cumWtsArr = sv.getCumulativeWeights();
     int lenQ = quantilesArr.length;
diff --git a/src/test/java/org/apache/datasketches/quantiles/UtilTest.java b/src/test/java/org/apache/datasketches/quantiles/UtilTest.java
index 0ca3ea4..49546df 100644
--- a/src/test/java/org/apache/datasketches/quantiles/UtilTest.java
+++ b/src/test/java/org/apache/datasketches/quantiles/UtilTest.java
@@ -165,7 +165,6 @@
  }
 
  /**
-  *
   * @param numTries number of tries
   * @param maxArrLen maximum length of array size
   */
@@ -178,7 +177,7 @@
          arr = makeMergeTestInput(arrLen, blkSize);
          long [] brr = makeTheTandemArray(arr);
          assertMergeTestPrecondition(arr, brr, arrLen, blkSize);
-         DoublesSketchSortedView.blockyTandemMergeSort(arr, brr, arrLen, blkSize);
+         DoublesSketch.blockyTandemMergeSort(arr, brr, arrLen, blkSize);
          /* verify sorted order */
          for (int i = 0; i < (arrLen-1); i++) {
            assert arr[i] <= arr[i+1];
diff --git a/src/test/java/org/apache/datasketches/quantilescommon/CrossCheckQuantilesTest.java b/src/test/java/org/apache/datasketches/quantilescommon/CrossCheckQuantilesTest.java
index dbbbb65..6049f2f 100644
--- a/src/test/java/org/apache/datasketches/quantilescommon/CrossCheckQuantilesTest.java
+++ b/src/test/java/org/apache/datasketches/quantilescommon/CrossCheckQuantilesTest.java
@@ -29,8 +29,7 @@
 import static org.apache.datasketches.quantilescommon.LinearRanksAndQuantiles.getTrueItemRank;
 import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.EXCLUSIVE;
 import static org.apache.datasketches.quantilescommon.QuantileSearchCriteria.INCLUSIVE;
-import static org.apache.datasketches.quantilescommon.ReflectUtilityTest.CLASSIC_DOUBLES_SV_CTOR;
-import static org.apache.datasketches.quantilescommon.ReflectUtilityTest.KLL_DOUBLES_SV_CTOR;
+import static org.apache.datasketches.quantilescommon.ReflectUtilityTest.DOUBLES_SV_CTOR;
 import static org.apache.datasketches.quantilescommon.ReflectUtilityTest.KLL_FLOATS_SV_CTOR;
 import static org.apache.datasketches.quantilescommon.ReflectUtilityTest.REQ_SV_CTOR;
 import static org.testng.Assert.assertEquals;
@@ -40,13 +39,11 @@
 import org.apache.datasketches.common.ArrayOfStringsSerDe;
 import org.apache.datasketches.common.SketchesArgumentException;
 import org.apache.datasketches.kll.KllDoublesSketch;
-import org.apache.datasketches.kll.KllDoublesSketchSortedView;
 import org.apache.datasketches.kll.KllFloatsSketch;
 import org.apache.datasketches.kll.KllFloatsSketchSortedView;
 import org.apache.datasketches.kll.KllItemsSketch;
 import org.apache.datasketches.kll.KllSketch;
 import org.apache.datasketches.quantiles.DoublesSketch;
-import org.apache.datasketches.quantiles.DoublesSketchSortedView;
 import org.apache.datasketches.quantiles.ItemsSketch;
 import org.apache.datasketches.quantiles.UpdateDoublesSketch;
 import org.apache.datasketches.req.ReqSketch;
@@ -146,10 +143,10 @@
 
   ReqSketchSortedView reqFloatsSV = null;
   KllFloatsSketchSortedView kllFloatsSV = null;
-  KllDoublesSketchSortedView kllDoublesSV = null;
+  DoublesSketchSortedView kllDoublesSV = null;
   DoublesSketchSortedView classicDoublesSV = null;
   ItemsSketchSortedView<String> kllItemsSV = null;
-  ItemsSketchSortedView<String> itemsSV = null;
+  ItemsSketchSortedView<String> classicItemsSV = null;
 
   public CrossCheckQuantilesTest() {}
 
@@ -221,7 +218,7 @@
       testRank = kllItemsSk.getRank(s, crit);
       assertEquals(testRank, trueRank);
 
-      testRank = itemsSV.getRank(s, crit);
+      testRank = classicItemsSV.getRank(s, crit);
       assertEquals(testRank, trueRank);
       testRank = itemsSk.getRank(s, crit);
       assertEquals(testRank, trueRank);
@@ -286,7 +283,7 @@
       testIQ = kllItemsSk.getQuantile(normRank, crit);
       assertEquals(testIQ, trueIQ);
 
-      testIQ = itemsSV.getQuantile(normRank, crit);
+      testIQ = classicItemsSV.getQuantile(normRank, crit);
       assertEquals(testIQ, trueIQ);
       testIQ = itemsSk.getQuantile(normRank, crit);
       assertEquals(testIQ, trueIQ);
@@ -339,9 +336,9 @@
         svMaxFValues[set], svMinFValues[set]);
     kllFloatsSV = getRawKllFloatsSV(svFValues[set], svCumWeights[set], totalN[set],
         svMaxFValues[set], svMinFValues[set]);
-    kllDoublesSV = getRawKllDoublesSV(svDValues[set], svCumWeights[set], totalN[set],
+    kllDoublesSV = getRawDoublesSV(svDValues[set], svCumWeights[set], totalN[set],
         svMaxDValues[set], svMinDValues[set]);
-    classicDoublesSV = getRawClassicDoublesSV(svDValues[set], svCumWeights[set], totalN[set],
+    classicDoublesSV = getRawDoublesSV(svDValues[set], svCumWeights[set], totalN[set],
         svMaxDValues[set], svMinDValues[set]);
     String svImax = svIValues[set][svIValues[set].length - 1];
     String svImin = svIValues[set][0];
@@ -349,7 +346,7 @@
     kllItemsSV = new ItemsSketchSortedView<>(svIValues[set], svCumWeights[set], totalN[set],
         comparator, svImax, svImin, normRankErr);
     normRankErr = ItemsSketch.getNormalizedRankError(k, true);
-    itemsSV = new ItemsSketchSortedView<>(svIValues[set], svCumWeights[set], totalN[set],
+    classicItemsSV = new ItemsSketchSortedView<>(svIValues[set], svCumWeights[set], totalN[set],
         comparator, svImax, svImin, normRankErr);
   }
 
@@ -365,16 +362,10 @@
     return (KllFloatsSketchSortedView) KLL_FLOATS_SV_CTOR.newInstance(values, cumWeights, totalN, maxItem, minItem);
   }
 
-  private final static KllDoublesSketchSortedView getRawKllDoublesSV(
+  private final static DoublesSketchSortedView getRawDoublesSV(
       final double[] values, final long[] cumWeights, final long totalN, final double maxItem, final double minItem)
           throws Exception {
-    return (KllDoublesSketchSortedView) KLL_DOUBLES_SV_CTOR.newInstance(values, cumWeights, totalN, maxItem, minItem);
-  }
-
-  private final static DoublesSketchSortedView getRawClassicDoublesSV(
-      final double[] values, final long[] cumWeights, final long totalN, final double maxItem, final double minItem)
-          throws Exception {
-    return (DoublesSketchSortedView) CLASSIC_DOUBLES_SV_CTOR.newInstance(values, cumWeights, totalN, maxItem, minItem);
+    return (DoublesSketchSortedView) DOUBLES_SV_CTOR.newInstance(values, cumWeights, totalN, maxItem, minItem);
   }
 
   /********BUILD DATA SETS**********/
diff --git a/src/test/java/org/apache/datasketches/quantilescommon/ReflectUtilityTest.java b/src/test/java/org/apache/datasketches/quantilescommon/ReflectUtilityTest.java
index 191629f..6dbcac2 100644
--- a/src/test/java/org/apache/datasketches/quantilescommon/ReflectUtilityTest.java
+++ b/src/test/java/org/apache/datasketches/quantilescommon/ReflectUtilityTest.java
@@ -36,28 +36,23 @@
 
   static final Class<?> REQ_SV;
   static final Class<?> KLL_FLOATS_SV;
-  static final Class<?> KLL_DOUBLES_SV;
-  static final Class<?> CLASSIC_DOUBLES_SV;
+  static final Class<?> DOUBLES_SV;
 
   static final Constructor<?> REQ_SV_CTOR;
   static final Constructor<?> KLL_FLOATS_SV_CTOR;
-  static final Constructor<?> KLL_DOUBLES_SV_CTOR;
-  static final Constructor<?> CLASSIC_DOUBLES_SV_CTOR;
+  static final Constructor<?> DOUBLES_SV_CTOR;
 
   static {
     REQ_SV = getClass("org.apache.datasketches.req.ReqSketchSortedView");
     KLL_FLOATS_SV = getClass("org.apache.datasketches.kll.KllFloatsSketchSortedView");
-    KLL_DOUBLES_SV = getClass("org.apache.datasketches.kll.KllDoublesSketchSortedView");
-    CLASSIC_DOUBLES_SV = getClass("org.apache.datasketches.quantiles.DoublesSketchSortedView");
+    DOUBLES_SV = getClass("org.apache.datasketches.quantilescommon.DoublesSketchSortedView");
 
     REQ_SV_CTOR =
         getConstructor(REQ_SV, float[].class, long[].class, long.class, float.class, float.class);
     KLL_FLOATS_SV_CTOR =
         getConstructor(KLL_FLOATS_SV, float[].class, long[].class, long.class, float.class, float.class);
-    KLL_DOUBLES_SV_CTOR =
-        getConstructor(KLL_DOUBLES_SV, double[].class, long[].class, long.class, double.class, double.class);
-    CLASSIC_DOUBLES_SV_CTOR =
-        getConstructor(CLASSIC_DOUBLES_SV, double[].class, long[].class, long.class, double.class, double.class);
+    DOUBLES_SV_CTOR =
+        getConstructor(DOUBLES_SV, double[].class, long[].class, long.class, double.class, double.class);
   }
 
   @Test //Example