blob: 2bf4564420d90ff03e0311cb08d8d3455eac278c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.datasketches.quantiles;
import static java.lang.System.arraycopy;
import static org.apache.datasketches.Util.checkIfPowerOf2;
import java.util.Arrays;
import java.util.Comparator;
import org.apache.datasketches.SketchesArgumentException;
/**
* Down-sampling and merge algorithms for items quantiles.
*
* @author Lee Rhodes
* @author Alexander Saydakov
* @author Kevin Lang
*/
final class ItemsMergeImpl {
private ItemsMergeImpl() {}
/**
* Merges the source sketch into the target sketch that can have a smaller value of K.
* However, it is required that the ratio of the two K values be a power of 2.
* I.e., source.getK() = target.getK() * 2^(nonnegative integer).
* The source is not modified.
*
* <p>Note: It is easy to prove that the following simplified code which launches multiple waves of
* carry propagation does exactly the same amount of merging work (including the work of
* allocating fresh buffers) as the more complicated and seemingly more efficient approach that
* tracks a single carry propagation wave through both sketches.
*
* <p>This simplified code probably does do slightly more "outer loop" work, but I am pretty
* sure that even that is within a constant factor of the more complicated code, plus the
* total amount of "outer loop" work is at least a factor of K smaller than the total amount of
* merging work, which is identical in the two approaches.
*
* <p>Note: a two-way merge that doesn't modify either of its two inputs could be implemented
* by making a deep copy of the larger sketch and then merging the smaller one into it.
* However, it was decided not to do this.
*
* @param <T> the data type
* @param src The source sketch
* @param tgt The target sketch
*/
@SuppressWarnings("unchecked")
static <T> void mergeInto(final ItemsSketch<T> src, final ItemsSketch<T> tgt) {
final int srcK = src.getK();
final int tgtK = tgt.getK();
final long srcN = src.getN();
final long tgtN = tgt.getN();
if (srcK != tgtK) {
downSamplingMergeInto(src, tgt);
return;
}
//The remainder of this code is for the case where the k's are equal
final Object[] srcCombBuf = src.getCombinedBuffer();
final long nFinal = tgtN + srcN;
for (int i = 0; i < src.getBaseBufferCount(); i++) { //update only the base buffer
tgt.update((T) srcCombBuf[i]);
}
ItemsUpdateImpl.maybeGrowLevels(tgt, nFinal);
final Object[] scratchBuf = new Object[2 * tgtK];
long srcBitPattern = src.getBitPattern();
assert srcBitPattern == (srcN / (2L * srcK));
for (int srcLvl = 0; srcBitPattern != 0L; srcLvl++, srcBitPattern >>>= 1) {
if ((srcBitPattern & 1L) > 0L) { //only one level above base buffer
ItemsUpdateImpl.inPlacePropagateCarry(
srcLvl,
(T[]) srcCombBuf, (2 + srcLvl) * tgtK,
(T[]) scratchBuf, 0,
false,
tgt);
// won't update tgt.n_ until the very end
}
}
tgt.n_ = nFinal;
assert (tgt.getN() / (2L * tgtK)) == tgt.getBitPattern(); // internal consistency check
final T srcMax = src.getMaxValue();
final T srcMin = src.getMinValue();
final T tgtMax = tgt.getMaxValue();
final T tgtMin = tgt.getMinValue();
if ((srcMax != null) && (tgtMax != null)) {
tgt.maxValue_ = (src.getComparator().compare(srcMax, tgtMax) > 0) ? srcMax : tgtMax;
} //only one could be null
else if (tgtMax == null) { //if srcMax were null we would leave tgt alone
tgt.maxValue_ = srcMax;
}
if ((srcMin != null) && (tgtMin != null)) {
tgt.minValue_ = (src.getComparator().compare(srcMin, tgtMin) > 0) ? tgtMin : srcMin;
} //only one could be null
else if (tgtMin == null) { //if srcMin were null we would leave tgt alone
tgt.minValue_ = srcMin;
}
}
/**
* Merges the source sketch into the target sketch that can have a smaller value of K.
* However, it is required that the ratio of the two K values be a power of 2.
* I.e., source.getK() = target.getK() * 2^(nonnegative integer).
* The source is not modified.
* @param <T> the data type
* @param src The source sketch
* @param tgt The target sketch
*/
@SuppressWarnings("unchecked") //also used by ItemsSketch and ItemsUnion
static <T> void downSamplingMergeInto(final ItemsSketch<T> src, final ItemsSketch<T> tgt) {
final int targetK = tgt.getK();
final int sourceK = src.getK();
if ((sourceK % targetK) != 0) {
throw new SketchesArgumentException(
"source.getK() must equal target.getK() * 2^(nonnegative integer).");
}
final int downFactor = sourceK / targetK;
checkIfPowerOf2(downFactor, "source.getK()/target.getK() ratio");
final int lgDownFactor = Integer.numberOfTrailingZeros(downFactor);
final Object[] sourceLevels = src.getCombinedBuffer(); // aliasing is a bit dangerous
final Object[] sourceBaseBuffer = src.getCombinedBuffer(); // aliasing is a bit dangerous
final long nFinal = tgt.getN() + src.getN();
for (int i = 0; i < src.getBaseBufferCount(); i++) {
tgt.update((T) sourceBaseBuffer[i]);
}
ItemsUpdateImpl.maybeGrowLevels(tgt, nFinal);
final Object[] scratchBuf = new Object[2 * targetK];
final Object[] downBuf = new Object[targetK];
long srcBitPattern = src.getBitPattern();
for (int srcLvl = 0; srcBitPattern != 0L; srcLvl++, srcBitPattern >>>= 1) {
if ((srcBitPattern & 1L) > 0L) {
ItemsMergeImpl.justZipWithStride(
sourceLevels, (2 + srcLvl) * sourceK,
downBuf, 0,
targetK,
downFactor);
ItemsUpdateImpl.inPlacePropagateCarry(
srcLvl + lgDownFactor,
(T[]) downBuf, 0,
(T[]) scratchBuf, 0,
false, tgt);
// won't update target.n_ until the very end
}
}
tgt.n_ = nFinal;
assert (tgt.getN() / (2L * targetK)) == tgt.getBitPattern(); // internal consistency check
final T srcMax = src.getMaxValue();
final T srcMin = src.getMinValue();
final T tgtMax = tgt.getMaxValue();
final T tgtMin = tgt.getMinValue();
if ((srcMax != null) && (tgtMax != null)) {
tgt.maxValue_ = (src.getComparator().compare(srcMax, tgtMax) > 0) ? srcMax : tgtMax;
} //only one could be null
else if (tgtMax == null) { //if srcMax were null we would leave tgt alone
tgt.maxValue_ = srcMax;
}
if ((srcMin != null) && (tgtMin != null)) {
tgt.minValue_ = (src.getComparator().compare(srcMin, tgtMin) > 0) ? tgtMin : srcMin;
} //only one could be null
else if (tgtMin == null) { //if srcMin were null we would leave tgt alone
tgt.minValue_ = srcMin;
}
}
private static <T> void justZipWithStride(
final T[] bufSrc, final int startSrc, // input
final T[] bufC, final int startC, // output
final int kC, // number of items that should be in the output
final int stride) {
final int randomOffset = ItemsSketch.rand.nextInt(stride);
final int limC = startC + kC;
for (int a = startSrc + randomOffset, c = startC; c < limC; a += stride, c++ ) {
bufC[c] = bufSrc[a];
}
}
/**
* blockyTandemMergeSort() is an implementation of top-down merge sort specialized
* for the case where the input contains successive equal-length blocks
* that have already been sorted, so that only the top part of the
* merge tree remains to be executed. Also, two arrays are sorted in tandem,
* as discussed above.
* @param <T> the data type
* @param keyArr array of keys
* @param valArr array of values
* @param arrLen length of keyArr and valArr
* @param blkSize size of internal sorted blocks
* @param comparator the comparator for data type T
*/
//also used by ItemsAuxiliary
static <T> void blockyTandemMergeSort(final T[] keyArr, final long[] valArr, final int arrLen,
final int blkSize, final Comparator<? super T> comparator) {
assert blkSize >= 1;
if (arrLen <= blkSize) { return; }
int numblks = arrLen / blkSize;
if ((numblks * blkSize) < arrLen) { numblks += 1; }
assert ((numblks * blkSize) >= arrLen);
// duplicate the input is preparation for the "ping-pong" copy reduction strategy.
final T[] keyTmp = Arrays.copyOf(keyArr, arrLen);
final long[] valTmp = Arrays.copyOf(valArr, arrLen);
blockyTandemMergeSortRecursion(keyTmp, valTmp,
keyArr, valArr,
0, numblks,
blkSize, arrLen, comparator);
}
/**
* blockyTandemMergeSortRecursion() is called by blockyTandemMergeSort().
* In addition to performing the algorithm's top down recursion,
* it manages the buffer swapping that eliminates most copying.
* It also maps the input's pre-sorted blocks into the subarrays
* that are processed by tandemMerge().
* @param <T> the data type
* @param keySrc key source
* @param valSrc value source
* @param keyDst key destination
* @param valDst value destination
* @param grpStart group start, refers to pre-sorted blocks such as block 0, block 1, etc.
* @param grpLen group length, refers to pre-sorted blocks such as block 0, block 1, etc.
* @param blkSize block size
* @param arrLim array limit
* @param comparator to compare keys
*/
private static <T> void blockyTandemMergeSortRecursion(final T[] keySrc, final long[] valSrc,
final T[] keyDst, final long[] valDst, final int grpStart, final int grpLen, // block indices
final int blkSize, final int arrLim, final Comparator<? super T> comparator) {
// Important note: grpStart and grpLen do NOT refer to positions in the underlying array.
// Instead, they refer to the pre-sorted blocks, such as block 0, block 1, etc.
assert (grpLen > 0);
if (grpLen == 1) { return; }
final int grpLen1 = grpLen / 2;
final int grpLen2 = grpLen - grpLen1;
assert (grpLen1 >= 1);
assert (grpLen2 >= grpLen1);
final int grpStart1 = grpStart;
final int grpStart2 = grpStart + grpLen1;
//swap roles of src and dst
blockyTandemMergeSortRecursion(keyDst, valDst,
keySrc, valSrc,
grpStart1, grpLen1, blkSize, arrLim, comparator);
//swap roles of src and dst
blockyTandemMergeSortRecursion(keyDst, valDst,
keySrc, valSrc,
grpStart2, grpLen2, blkSize, arrLim, comparator);
// here we convert indices of blocks into positions in the underlying array.
final int arrStart1 = grpStart1 * blkSize;
final int arrStart2 = grpStart2 * blkSize;
final int arrLen1 = grpLen1 * blkSize;
int arrLen2 = grpLen2 * blkSize;
// special case for the final block which might be shorter than blkSize.
if ((arrStart2 + arrLen2) > arrLim) {
arrLen2 = arrLim - arrStart2;
}
tandemMerge(keySrc, valSrc,
arrStart1, arrLen1,
arrStart2, arrLen2,
keyDst, valDst,
arrStart1, comparator); // which will be arrStart3
}
/**
* Performs two merges in tandem. One of them provides the sort keys
* while the other one passively undergoes the same data motion.
* @param <T> the data type
* @param keySrc key source
* @param valSrc value source
* @param arrStart1 Array 1 start offset
* @param arrLen1 Array 1 length
* @param arrStart2 Array 2 start offset
* @param arrLen2 Array 2 length
* @param keyDst key destination
* @param valDst value destination
* @param arrStart3 Array 3 start offset
* @param comparator to compare keys
*/
private static <T> void tandemMerge(final T[] keySrc, final long[] valSrc,
final int arrStart1, final int arrLen1,
final int arrStart2, final int arrLen2,
final T[] keyDst, final long[] valDst,
final int arrStart3, final Comparator<? super T> comparator) {
final int arrStop1 = arrStart1 + arrLen1;
final int arrStop2 = arrStart2 + arrLen2;
int i1 = arrStart1;
int i2 = arrStart2;
int i3 = arrStart3;
while ((i1 < arrStop1) && (i2 < arrStop2)) {
if (comparator.compare(keySrc[i2], keySrc[i1]) < 0) {
keyDst[i3] = keySrc[i2];
valDst[i3] = valSrc[i2];
i3++; i2++;
} else {
keyDst[i3] = keySrc[i1];
valDst[i3] = valSrc[i1];
i3++; i1++;
}
}
if (i1 < arrStop1) {
arraycopy(keySrc, i1, keyDst, i3, arrStop1 - i1);
arraycopy(valSrc, i1, valDst, i3, arrStop1 - i1);
} else {
assert i2 < arrStop2;
arraycopy(keySrc, i2, keyDst, i3, arrStop2 - i2);
arraycopy(valSrc, i2, valDst, i3, arrStop2 - i2);
}
}
}