Merge pull request #376 from apache/FixSetOpLeakage

Fix set op leakage
diff --git a/src/main/java/org/apache/datasketches/tuple/AnotB.java b/src/main/java/org/apache/datasketches/tuple/AnotB.java
index 5d9702e..74012ce 100644
--- a/src/main/java/org/apache/datasketches/tuple/AnotB.java
+++ b/src/main/java/org/apache/datasketches/tuple/AnotB.java
@@ -25,7 +25,6 @@
 import static org.apache.datasketches.Util.REBUILD_THRESHOLD;
 import static org.apache.datasketches.Util.simpleLog2OfLong;
 
-import java.lang.reflect.Array;
 import java.lang.reflect.Method;
 import java.util.Arrays;
 
@@ -119,7 +118,7 @@
 
     empty_ = skA.isEmpty();
     thetaLong_ = skA.getThetaLong();
-    final DataArrays<S> da = getDataArraysTuple(skA);
+    final DataArrays<S> da = getCopyOfDataArraysTuple(skA);
     summaryArr_ = da.summaryArr;  //it may be null
     hashArr_ = da.hashArr;        //it may be null
     curCount_ = (hashArr_ == null) ? 0 : hashArr_.length;
@@ -174,7 +173,7 @@
       }
       case SKA_TRIM: {
         thetaLong_ = min(thetaLong_, thetaLongB);
-        final DataArrays<S> da = trimDataArrays(hashArr_, summaryArr_, thetaLong_);
+        final DataArrays<S> da = trimAndCopyDataArrays(hashArr_, summaryArr_, thetaLong_, true);
         hashArr_ = da.hashArr;
         curCount_ = (hashArr_ == null) ? 0 : hashArr_.length;
         summaryArr_ = da.summaryArr;
@@ -186,7 +185,7 @@
       }
       case FULL_ANOTB: { //both A and B should have valid entries.
         thetaLong_ = min(thetaLong_, thetaLongB);
-        final DataArrays<S> daR = getAnotbResultArraysTuple(thetaLong_, curCount_, hashArr_, summaryArr_, skB);
+        final DataArrays<S> daR = getCopyOfResultArraysTuple(thetaLong_, curCount_, hashArr_, summaryArr_, skB);
         hashArr_ = daR.hashArr;
         curCount_ = (hashArr_ == null) ? 0 : hashArr_.length;
         summaryArr_ = daR.summaryArr;
@@ -247,7 +246,7 @@
       }
       case SKA_TRIM: {
         thetaLong_ = min(thetaLong_, thetaLongB);
-        final DataArrays<S> da = trimDataArrays(hashArr_, summaryArr_,thetaLong_);
+        final DataArrays<S> da = trimAndCopyDataArrays(hashArr_, summaryArr_,thetaLong_, true);
         hashArr_ = da.hashArr;
         curCount_ = (hashArr_ == null) ? 0 : hashArr_.length;
         summaryArr_ = da.summaryArr;
@@ -258,7 +257,7 @@
       }
       case FULL_ANOTB: { //both A and B should have valid entries.
         thetaLong_ = min(thetaLong_, thetaLongB);
-        final DataArrays<S> daB = getAnotbResultArraysTheta(thetaLong_, curCount_, hashArr_, summaryArr_, skB);
+        final DataArrays<S> daB = getCopyOfResultArraysTheta(thetaLong_, curCount_, hashArr_, summaryArr_, skB);
         hashArr_ = daB.hashArr;
         curCount_ = (hashArr_ == null) ? 0 : hashArr_.length;
         summaryArr_ = daB.summaryArr;
@@ -282,7 +281,8 @@
     if (curCount_ == 0) {
       result = new CompactSketch<>(null, null, thetaLong_, thetaLong_ == Long.MAX_VALUE);
     } else {
-      result = new CompactSketch<>(hashArr_, summaryArr_, thetaLong_, false);
+
+      result = new CompactSketch<>(hashArr_, Util.copySummaryArray(summaryArr_), thetaLong_, false);
     }
     if (reset) { reset(); }
     return result;
@@ -349,24 +349,24 @@
         break;
       }
       case SKA_TRIM: {
-        final DataArrays<S> daA = getDataArraysTuple(skA);
+        final DataArrays<S> daA = getCopyOfDataArraysTuple(skA);
         final long[] hashArrA = daA.hashArr;
         final S[] summaryArrA = daA.summaryArr;
         final long minThetaLong =  min(thetaLongA, thetaLongB);
-        final DataArrays<S> da = trimDataArrays(hashArrA, summaryArrA, minThetaLong);
+        final DataArrays<S> da = trimAndCopyDataArrays(hashArrA, summaryArrA, minThetaLong, false);
         result = new CompactSketch<>(da.hashArr, da.summaryArr, minThetaLong, skA.empty_);
         break;
       }
       case SKETCH_A: {
-        final DataArrays<S> daA = getDataArraysTuple(skA);
+        final DataArrays<S> daA = getCopyOfDataArraysTuple(skA);
         result = new CompactSketch<>(daA.hashArr, daA.summaryArr, thetaLongA, skA.empty_);
         break;
       }
       case FULL_ANOTB: { //both A and B should have valid entries.
-        final DataArrays<S> daA = getDataArraysTuple(skA);
+        final DataArrays<S> daA = getCopyOfDataArraysTuple(skA);
         final long minThetaLong = min(thetaLongA, thetaLongB);
         final DataArrays<S> daR =
-            getAnotbResultArraysTuple(minThetaLong, daA.hashArr.length, daA.hashArr, daA.summaryArr, skB);
+            getCopyOfResultArraysTuple(minThetaLong, daA.hashArr.length, daA.hashArr, daA.summaryArr, skB);
         final int countR = (daR.hashArr == null) ? 0 : daR.hashArr.length;
         if (countR == 0) {
           result = new CompactSketch<>(null, null, minThetaLong, minThetaLong == Long.MAX_VALUE);
@@ -441,24 +441,24 @@
         break;
       }
       case SKA_TRIM: {
-        final DataArrays<S> daA = getDataArraysTuple(skA);
+        final DataArrays<S> daA = getCopyOfDataArraysTuple(skA);
         final long[] hashArrA = daA.hashArr;
         final S[] summaryArrA = daA.summaryArr;
         final long minThetaLong = min(thetaLongA, thetaLongB);
-        final DataArrays<S> da = trimDataArrays(hashArrA, summaryArrA, minThetaLong);
+        final DataArrays<S> da = trimAndCopyDataArrays(hashArrA, summaryArrA, minThetaLong, false);
         result = new CompactSketch<>(da.hashArr, da.summaryArr, minThetaLong, skA.empty_);
         break;
       }
       case SKETCH_A: {
-        final DataArrays<S> daA = getDataArraysTuple(skA);
+        final DataArrays<S> daA = getCopyOfDataArraysTuple(skA);
         result = new CompactSketch<>(daA.hashArr, daA.summaryArr, thetaLongA, skA.empty_);
         break;
       }
       case FULL_ANOTB: { //both A and B should have valid entries.
-        final DataArrays<S> daA = getDataArraysTuple(skA);
+        final DataArrays<S> daA = getCopyOfDataArraysTuple(skA);
         final long minThetaLong = min(thetaLongA, thetaLongB);
         final DataArrays<S> daR =
-            getAnotbResultArraysTheta(minThetaLong, daA.hashArr.length, daA.hashArr, daA.summaryArr, skB);
+            getCopyOfResultArraysTheta(minThetaLong, daA.hashArr.length, daA.hashArr, daA.summaryArr, skB);
         final int countR = (daR.hashArr == null) ? 0 : daR.hashArr.length;
         if (countR == 0) {
           result = new CompactSketch<>(null, null, minThetaLong, minThetaLong == Long.MAX_VALUE);
@@ -478,7 +478,8 @@
     S[] summaryArr;
   }
 
-  private static <S extends Summary> DataArrays<S> getDataArraysTuple(
+  @SuppressWarnings("unchecked")
+  private static <S extends Summary> DataArrays<S> getCopyOfDataArraysTuple(
       final Sketch<S> sk) {
     final CompactSketch<S> csk;
     final DataArrays<S> da = new DataArrays<>();
@@ -493,40 +494,14 @@
       da.summaryArr = null;
     } else {
       da.hashArr = csk.getHashArr().clone();       //deep copy, may not be sorted
-      da.summaryArr = csk.getSummaryArr().clone(); //shallow copy
+      da.summaryArr = Util.copySummaryArray(csk.getSummaryArr());
     }
     return da;
   }
 
   @SuppressWarnings("unchecked")
-  private static <S extends Summary> DataArrays<S> trimDataArrays(
-      final long[] hashArr,
-      final S[] summaryArr,
-      final long minThetaLong) {
-
-    //build temporary arrays
-    final int countIn = hashArr.length;
-    final long[] tmpHashArr = new long[countIn];
-    final Class<S> summaryType = (Class<S>) summaryArr.getClass().getComponentType();
-    final S[] tmpSummaryArr = (S[]) Array.newInstance(summaryType, countIn);
-    int countResult = 0;
-    for (int i = 0; i < countIn; i++) {
-      final long hash = hashArr[i];
-      if (hash < minThetaLong) {
-        tmpHashArr[countResult] = hash;
-        tmpSummaryArr[countResult] = summaryArr[i];
-        countResult++;
-      } else { continue; }
-    }
-    final DataArrays<S> da = new DataArrays<>();
-    da.hashArr = Arrays.copyOfRange(tmpHashArr, 0, countResult);
-    da.summaryArr = Arrays.copyOfRange(tmpSummaryArr, 0, countResult);
-    return da;
-  }
-
-  @SuppressWarnings("unchecked")
   //Both skA and skB must have entries (count > 0)
-  private static <S extends Summary> DataArrays<S> getAnotbResultArraysTuple(
+  private static <S extends Summary> DataArrays<S> getCopyOfResultArraysTuple(
       final long minThetaLong,
       final int countA,
       final long[] hashArrA,
@@ -548,8 +523,7 @@
 
     //build temporary arrays of skA
     final long[] tmpHashArrA = new long[countA];
-    final Class<S> summaryType = (Class<S>) summaryArrA.getClass().getComponentType();
-    final S[] tmpSummaryArrA = (S[]) Array.newInstance(summaryType, countA);
+    final S[] tmpSummaryArrA = Util.newSummaryArray(summaryArrA, countA);
 
     //search for non matches and build temp arrays
     final int lgHTBLen = simpleLog2OfLong(hashTableB.length);
@@ -560,7 +534,7 @@
         final int index = hashSearch(hashTableB, lgHTBLen, hash);
         if (index == -1) {
           tmpHashArrA[nonMatches] = hash;
-          tmpSummaryArrA[nonMatches] = summaryArrA[i];
+          tmpSummaryArrA[nonMatches] = (S) summaryArrA[i].copy();
           nonMatches++;
         }
       }
@@ -571,7 +545,7 @@
   }
 
   @SuppressWarnings("unchecked")
-  private static <S extends Summary> DataArrays<S> getAnotbResultArraysTheta(
+  private static <S extends Summary> DataArrays<S> getCopyOfResultArraysTheta(
       final long minThetaLong,
       final int countA,
       final long[] hashArrA,
@@ -595,8 +569,7 @@
 
     //build temporary result arrays of skA
     final long[] tmpHashArrA = new long[countA];
-    final Class<S> summaryType = (Class<S>) summaryArrA.getClass().getComponentType();
-    final S[] tmpSummaryArrA = (S[]) Array.newInstance(summaryType, countA);
+    final S[] tmpSummaryArrA = Util.newSummaryArray(summaryArrA, countA);
 
     //search for non matches and build temp arrays
     final int lgHTBLen = simpleLog2OfLong(hashTableB.length);
@@ -607,7 +580,7 @@
         final int index = hashSearch(hashTableB, lgHTBLen, hash);
         if (index == -1) { //not found
           tmpHashArrA[nonMatches] = hash;
-          tmpSummaryArrA[nonMatches] = summaryArrA[i];
+          tmpSummaryArrA[nonMatches] = (S) summaryArrA[i].copy();
           nonMatches++;
         }
       }
@@ -618,6 +591,33 @@
     return daB;
   }
 
+  @SuppressWarnings("unchecked")
+  private static <S extends Summary> DataArrays<S> trimAndCopyDataArrays(
+      final long[] hashArr,
+      final S[] summaryArr,
+      final long minThetaLong,
+      final boolean copy) {
+
+    //build temporary arrays
+    final int countIn = hashArr.length;
+    final long[] tmpHashArr = new long[countIn];
+    final S[] tmpSummaryArr = Util.newSummaryArray(summaryArr, countIn);
+    int countResult = 0;
+    for (int i = 0; i < countIn; i++) {
+      final long hash = hashArr[i];
+      if (hash < minThetaLong) {
+        tmpHashArr[countResult] = hash;
+        tmpSummaryArr[countResult] = (S) (copy ? summaryArr[i].copy() : summaryArr[i]);
+        countResult++;
+      } else { continue; }
+    }
+    //Remove empty slots
+    final DataArrays<S> da = new DataArrays<>();
+    da.hashArr = Arrays.copyOfRange(tmpHashArr, 0, countResult);
+    da.summaryArr = Arrays.copyOfRange(tmpSummaryArr, 0, countResult);
+    return da;
+  }
+
   /**
    * Resets this operation back to the empty state.
    */
diff --git a/src/main/java/org/apache/datasketches/tuple/HashTables.java b/src/main/java/org/apache/datasketches/tuple/HashTables.java
index ed905a1..5749d79 100644
--- a/src/main/java/org/apache/datasketches/tuple/HashTables.java
+++ b/src/main/java/org/apache/datasketches/tuple/HashTables.java
@@ -97,12 +97,10 @@
       final long thetaLong,
       final SummarySetOperations<S> summarySetOps) {
 
-    final Class<S> summaryType = (Class<S>) summaryTable_.getClass().getComponentType();
-
     //Match nextSketch data with local instance data, filtering by theta
     final int maxMatchSize = min(count_, nextTupleSketch.getRetainedEntries());
     final long[] matchHashArr = new long[maxMatchSize];
-    final S[] matchSummariesArr = (S[]) Array.newInstance(summaryType, maxMatchSize);
+    final S[] matchSummariesArr = Util.newSummaryArray(summaryTable_, maxMatchSize);
     int matchCount = 0;
     final SketchIterator<S> it = nextTupleSketch.iterator();
 
diff --git a/src/main/java/org/apache/datasketches/tuple/Intersection.java b/src/main/java/org/apache/datasketches/tuple/Intersection.java
index 168b400..c8b9cda 100644
--- a/src/main/java/org/apache/datasketches/tuple/Intersection.java
+++ b/src/main/java/org/apache/datasketches/tuple/Intersection.java
@@ -25,8 +25,6 @@
 import static org.apache.datasketches.Util.MIN_LG_NOM_LONGS;
 import static org.apache.datasketches.Util.ceilingPowerOf2;
 
-import java.lang.reflect.Array;
-
 import org.apache.datasketches.SketchesArgumentException;
 import org.apache.datasketches.SketchesStateException;
 
@@ -200,19 +198,18 @@
       return new CompactSketch<>(null, null, thetaLong_, empty_);
     }
 
-    //Compact the hash tables
     final int tableSize = hashTables_.hashTable_.length;
 
     final long[] hashArr = new long[countIn];
-    final Class<S> summaryType = (Class<S>) hashTables_.summaryTable_.getClass().getComponentType();
-    final S[] summaryArr = (S[]) Array.newInstance(summaryType, countIn);
+    final S[] summaryArr = Util.newSummaryArray(hashTables_.summaryTable_, countIn);
 
+    //compact the arrays
     int cnt = 0;
     for (int i = 0; i < tableSize; i++) {
       final long hash = hashTables_.hashTable_[i];
       if (hash == 0 || hash > thetaLong_) { continue; }
       hashArr[cnt] = hash;
-      summaryArr[cnt] = hashTables_.summaryTable_[i];
+      summaryArr[cnt] = (S) hashTables_.summaryTable_[i].copy();
       cnt++;
     }
     assert cnt == countIn;
diff --git a/src/main/java/org/apache/datasketches/tuple/QuickSelectSketch.java b/src/main/java/org/apache/datasketches/tuple/QuickSelectSketch.java
index 12eea66..cba7ff6 100644
--- a/src/main/java/org/apache/datasketches/tuple/QuickSelectSketch.java
+++ b/src/main/java/org/apache/datasketches/tuple/QuickSelectSketch.java
@@ -299,8 +299,7 @@
       return new CompactSketch<>(null, null, thetaLong_, false);
     }
     final long[] hashArr = new long[getRetainedEntries()];
-    final S[] summaryArr = (S[])
-      Array.newInstance(summaryTable_.getClass().getComponentType(), getRetainedEntries());
+    final S[] summaryArr = Util.newSummaryArray(summaryTable_, getRetainedEntries());
     int i = 0;
     for (int j = 0; j < hashTable_.length; j++) {
       if (summaryTable_[j] != null) {
@@ -418,7 +417,7 @@
       if (index < 0) {
         insertSummary(~index, (S)summary.copy()); //did not find, so insert
       } else {
-        insertSummary(index, summarySetOps.union(summaryTable_[index], summary));
+        insertSummary(index, summarySetOps.union(summaryTable_[index], (S) summary.copy()));
       }
       rebuildIfNeeded();
     }
@@ -487,9 +486,8 @@
   private void resize(final int newSize) {
     final long[] oldHashTable = hashTable_;
     final S[] oldSummaryTable = summaryTable_;
-    final Class<S> summaryType = (Class<S>) summaryTable_.getClass().getComponentType();
     hashTable_ = new long[newSize];
-    summaryTable_ = (S[]) Array.newInstance(summaryType, newSize);
+    summaryTable_ = Util.newSummaryArray(summaryTable_, newSize);
     lgCurrentCapacity_ = Integer.numberOfTrailingZeros(newSize);
     count_ = 0;
     for (int i = 0; i < oldHashTable.length; i++) {
diff --git a/src/main/java/org/apache/datasketches/tuple/Union.java b/src/main/java/org/apache/datasketches/tuple/Union.java
index 147c44b..e28dd1d 100644
--- a/src/main/java/org/apache/datasketches/tuple/Union.java
+++ b/src/main/java/org/apache/datasketches/tuple/Union.java
@@ -22,8 +22,6 @@
 import static java.lang.Math.min;
 import static org.apache.datasketches.Util.DEFAULT_NOMINAL_ENTRIES;
 
-import java.lang.reflect.Array;
-
 import org.apache.datasketches.QuickSelect;
 import org.apache.datasketches.SketchesArgumentException;
 
@@ -63,7 +61,7 @@
 
   /**
    * Perform a stateless, pair-wise union operation between two tuple sketches.
-   * The returned sketch will be cutback to the smaller of the two k values if required.
+   * The returned sketch will be cut back to the smaller of the two k values if required.
    *
    * <p>Nulls and empty sketches are ignored.</p>
    *
@@ -82,7 +80,7 @@
 
   /**
    * Perform a stateless, pair-wise union operation between a tupleSketch and a thetaSketch.
-   * The returned sketch will be cutback to the smaller of the two k values if required.
+   * The returned sketch will be cut back to the smaller of the two k values if required.
    *
    * <p>Nulls and empty sketches are ignored.</p>
    *
@@ -139,7 +137,7 @@
     if (thetaIn < thetaLong_) { thetaLong_ = thetaIn; }
     final org.apache.datasketches.theta.HashIterator it = thetaSketch.iterator();
     while (it.next()) {
-      qsk_.merge(it.get(), (S)summary.copy(), summarySetOps_);
+      qsk_.merge(it.get(), summary, summarySetOps_); //copies summary
     }
     if (qsk_.thetaLong_ < thetaLong_) {
       thetaLong_ = qsk_.thetaLong_;
@@ -181,9 +179,8 @@
       theta = QuickSelect.select(hashArr, 0, numHashes - 1, qsk_.getNominalEntries());
       numHashes = qsk_.getNominalEntries();
     }
-    final Class<S> summaryType = (Class<S>) qsk_.getSummaryTable().getClass().getComponentType();
     final long[] hashArr = new long[numHashes];
-    final S[] summaries = (S[]) Array.newInstance(summaryType, numHashes);
+    final S[] summaries = Util.newSummaryArray(qsk_.getSummaryTable(), numHashes);
     final SketchIterator<S> it = qsk_.iterator();
     int i = 0;
     while (it.next()) {
diff --git a/src/main/java/org/apache/datasketches/tuple/Util.java b/src/main/java/org/apache/datasketches/tuple/Util.java
index 587da94..4523476 100644
--- a/src/main/java/org/apache/datasketches/tuple/Util.java
+++ b/src/main/java/org/apache/datasketches/tuple/Util.java
@@ -27,6 +27,8 @@
 import static org.apache.datasketches.memory.XxHash.hashCharArr;
 import static org.apache.datasketches.memory.XxHash.hashString;
 
+import java.lang.reflect.Array;
+
 import org.apache.datasketches.SketchesArgumentException;
 
 
@@ -135,4 +137,22 @@
     return hashCharArr(s.toCharArray(), 0, s.length(), PRIME);
   }
 
+  @SuppressWarnings("unchecked")
+  public static <S extends Summary> S[] copySummaryArray(final S[] summaryArr) {
+    final int len = summaryArr.length;
+    final S[] tmpSummaryArr = newSummaryArray(summaryArr, len);
+    for (int i = 0; i < len; i++) {
+      tmpSummaryArr[i] = (S) summaryArr[i].copy();
+    }
+    return tmpSummaryArr;
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <S extends Summary> S[] newSummaryArray(final S[] summaryArr, final int length) {
+    final Class<S> summaryType = (Class<S>) summaryArr.getClass().getComponentType();
+    final S[] tmpSummaryArr = (S[]) Array.newInstance(summaryType, length);
+    return tmpSummaryArr;
+  }
+
+
 }
diff --git a/src/test/java/org/apache/datasketches/tuple/aninteger/ParameterLeakageTest.java b/src/test/java/org/apache/datasketches/tuple/aninteger/ParameterLeakageTest.java
index aa9a36d..166f023 100644
--- a/src/test/java/org/apache/datasketches/tuple/aninteger/ParameterLeakageTest.java
+++ b/src/test/java/org/apache/datasketches/tuple/aninteger/ParameterLeakageTest.java
@@ -118,18 +118,17 @@
   public void checkAnotbStateless() {
     IntegerSketch sk1 = new IntegerSketch(4, Sum);
     sk1.update(1, 1);
-    IntegerSummary sk1sum = captureSummaries(sk1)[0];
+    CompactSketch<IntegerSummary> csk1 = sk1.compact();
+    IntegerSummary sk1sum = captureSummaries(csk1)[0];
 
-    IntegerSketch sk2 = new IntegerSketch(4, Sum);
-    sk2.update(2, 1);
-    IntegerSummary sk2sum = captureSummaries(sk2)[0];
+    IntegerSketch sk2 = new IntegerSketch(4, Sum); //EMPTY
 
-    CompactSketch<IntegerSummary> csk = AnotB.aNotB(sk1, sk2);
+    CompactSketch<IntegerSummary> csk = AnotB.aNotB(csk1, sk2);
     IntegerSummary[] summaries = captureSummaries(csk);
     println("AnotB Stateless Count: " + summaries.length);
 
     for (IntegerSummary isum : summaries) {
-      if ((isum == sk1sum) || (isum == sk2sum)) {
+      if (isum == sk1sum) {
         throw new IllegalArgumentException("Parameter Leakage");
       }
     }
@@ -139,15 +138,14 @@
   public void checkAnotbStateful() {
     IntegerSketch sk1 = new IntegerSketch(4, Sum);
     sk1.update(1, 1);
-    IntegerSummary sk1sum = captureSummaries(sk1)[0];
+    CompactSketch<IntegerSummary> csk1 = sk1.compact();
+    IntegerSummary sk1sum = captureSummaries(csk1)[0];
 
-    IntegerSketch sk2 = new IntegerSketch(4, Sum);
-    sk2.update(2, 1);
-    IntegerSummary sk2sum = captureSummaries(sk2)[0];
+    IntegerSketch sk2 = new IntegerSketch(4, Sum); //EMPTY
 
     AnotB<IntegerSummary> anotb = new AnotB<>();
 
-    anotb.setA(sk1);
+    anotb.setA(csk1);
     anotb.notB(sk2);
 
     CompactSketch<IntegerSummary> csk = anotb.getResult(true);
@@ -155,7 +153,7 @@
     println("AnotB Stateful Count: " + summaries.length);
 
     for (IntegerSummary isum : summaries) {
-      if ((isum == sk1sum) || (isum == sk2sum)) {
+      if (isum == sk1sum) {
         throw new IllegalArgumentException("Parameter Leakage");
       }
     }