blob: cc00d41de46022f5446fc242ac473d25d86bddcd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.datasketches.hll;
import static org.apache.datasketches.Util.invPow2;
import static org.apache.datasketches.hll.HllUtil.EMPTY;
import static org.apache.datasketches.hll.PreambleUtil.HLL_BYTE_ARR_START;
import static org.apache.datasketches.hll.TgtHllType.HLL_8;
import org.apache.datasketches.SketchesArgumentException;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
/**
* This performs union operations for all HllSketches. This union operator can be configured to be
* on or off heap. The source sketch given to this union using the {@link #update(HllSketch)} can
* be configured with any precision value <i>lgConfigK</i> (from 4 to 21), any <i>TgtHllType</i>
* (HLL_4, HLL_6, HLL_8), and either on or off-heap; and it can be in either of the sparse modes
* (<i>LIST</i> or <i>SET</i>), or the dense mode (<i>HLL</i>).
*
* <p>Although the API for this union operator parallels many of the methods of the
* <i>HllSketch</i>, the behavior of the union operator has some fundamental differences.</p>
*
* <p>First, this union operator is configured with a <i>lgMaxK</i> instead of the normal
* <i>lgConfigK</i>. Generally, this union operator will inherit the lowest <i>lgConfigK</i>
* less than <i>lgMaxK</i> that it has seen. However, the <i>lgConfigK</i> of incoming sketches that
* are still in sparse are ignored. The <i>lgMaxK</i> provides the user the ability to specify the
* largest maximum size for the union operation.
*
* <p>Second, the user cannot specify the {@link TgtHllType} as an input parameter to the union.
* Instead, it is specified for the sketch returned with {@link #getResult(TgtHllType)}.
*
* @author Lee Rhodes
* @author Kevin Lang
*/
public class Union extends BaseHllSketch {
final int lgMaxK;
private final HllSketch gadget;
/**
* Construct this Union operator with the default maximum log-base-2 of <i>K</i>.
*/
public Union() {
lgMaxK = HllSketch.DEFAULT_LG_K;
gadget = new HllSketch(lgMaxK, HLL_8);
}
/**
* Construct this Union operator with a given maximum log-base-2 of <i>K</i>.
* @param lgMaxK the desired maximum log-base-2 of <i>K</i>. This value must be
* between 4 and 21 inclusively.
*/
public Union(final int lgMaxK) {
this.lgMaxK = HllUtil.checkLgK(lgMaxK);
gadget = new HllSketch(lgMaxK, HLL_8);
}
/**
* Construct this Union operator with a given maximum log-base-2 of <i>K</i> and the given
* WritableMemory as the destination for this Union. This WritableMemory is usually configured
* for off-heap memory. What remains on the java heap is a thin wrapper object that reads and
* writes to the given WritableMemory.
*
* <p>The given <i>dstMem</i> is checked for the required capacity as determined by
* {@link HllSketch#getMaxUpdatableSerializationBytes(int, TgtHllType)}.
* @param lgMaxK the desired maximum log-base-2 of <i>K</i>. This value must be
* between 4 and 21 inclusively.
* @param dstMem the destination memory for the sketch.
*/
public Union(final int lgMaxK, final WritableMemory dstMem) {
this.lgMaxK = HllUtil.checkLgK(lgMaxK);
gadget = new HllSketch(lgMaxK, HLL_8, dstMem);
}
Union(final HllSketch sketch) {
lgMaxK = sketch.getLgConfigK();
final TgtHllType tgtHllType = sketch.getTgtHllType();
if (tgtHllType != TgtHllType.HLL_8) {
throw new SketchesArgumentException("Union can only wrap writable HLL_8 sketches.");
}
//This should not happen, this is in case it does.
if (sketch.hllSketchImpl.isRebuildCurMinNumKxQFlag()) {
throw new SketchesArgumentException
("Incomming sketch is corrupted, Rebuild_CurMin_Num_KxQ flag is set.");
}
gadget = sketch;
}
/**
* Construct a union operator populated with the given byte array image of an HllSketch.
* @param byteArray the given byte array
* @return a union operator populated with the given byte array image of an HllSketch.
*/
public static final Union heapify(final byte[] byteArray) {
return heapify(Memory.wrap(byteArray));
}
/**
* Construct a union operator populated with the given Memory image of an HllSketch.
* @param mem the given Memory
* @return a union operator populated with the given Memory image of an HllSketch.
*/
public static final Union heapify(final Memory mem) {
final int lgK = HllUtil.checkLgK(mem.getByte(PreambleUtil.LG_K_BYTE));
final HllSketch sk = HllSketch.heapify(mem);
final Union union = new Union(lgK);
union.update(sk);
return union;
}
/**
* Wraps the given WritableMemory, which must be a image of a valid updatable HLL_8 sketch,
* and may have data. What remains on the java heap is a
* thin wrapper object that reads and writes to the given WritableMemory, which, depending on
* how the user configures the WritableMemory, may actually reside on the Java heap or off-heap.
*
* <p>The given <i>dstMem</i> is checked for the required capacity as determined by
* {@link HllSketch#getMaxUpdatableSerializationBytes(int, TgtHllType)}, and for the correct type.
* @param wmem an writable image of a valid sketch with data.
* @return a Union operator where the sketch data is in the given dstMem.
*/
public static final Union writableWrap(final WritableMemory wmem) {
return new Union(HllSketch.writableWrap(wmem));
}
@Override
public double getCompositeEstimate() {
if (gadget.hllSketchImpl.isRebuildCurMinNumKxQFlag()) {
rebuildCurMinNumKxQ((AbstractHllArray)(gadget.hllSketchImpl));
}
return gadget.hllSketchImpl.getCompositeEstimate();
}
@Override
CurMode getCurMode() {
return gadget.getCurMode();
}
@Override
public int getCompactSerializationBytes() {
return gadget.getCompactSerializationBytes();
}
@Override
public double getEstimate() {
if (gadget.hllSketchImpl.isRebuildCurMinNumKxQFlag()) {
rebuildCurMinNumKxQ((AbstractHllArray)(gadget.hllSketchImpl));
}
return gadget.getEstimate();
}
/**
* Gets the effective <i>lgConfigK</i> for the union operator, which may be less than
* <i>lgMaxK</i>.
* @return the <i>lgConfigK</i>.
*/
@Override
public int getLgConfigK() {
return gadget.getLgConfigK();
}
@Override
public double getLowerBound(final int numStdDev) {
if (gadget.hllSketchImpl.isRebuildCurMinNumKxQFlag()) {
rebuildCurMinNumKxQ((AbstractHllArray)(gadget.hllSketchImpl));
}
return gadget.getLowerBound(numStdDev);
}
/**
* Returns the maximum size in bytes that this union operator can grow to given a lgK.
*
* @param lgK The maximum Log2 of K for this union operator. This value must be
* between 4 and 21 inclusively.
* @return the maximum size in bytes that this union operator can grow to.
*/
public static int getMaxSerializationBytes(final int lgK) {
return HllSketch.getMaxUpdatableSerializationBytes(lgK, TgtHllType.HLL_8);
}
/**
* Return the result of this union operator as an HLL_4 sketch.
* @return the result of this union operator as an HLL_4 sketch.
*/
public HllSketch getResult() {
if (gadget.hllSketchImpl.isRebuildCurMinNumKxQFlag()) {
rebuildCurMinNumKxQ((AbstractHllArray)(gadget.hllSketchImpl));
}
return gadget.copyAs(HllSketch.DEFAULT_HLL_TYPE);
}
/**
* Return the result of this union operator with the specified {@link TgtHllType}
* @param tgtHllType the TgtHllType enum
* @return the result of this union operator with the specified TgtHllType
*/
public HllSketch getResult(final TgtHllType tgtHllType) {
if (gadget.hllSketchImpl.isRebuildCurMinNumKxQFlag()) {
rebuildCurMinNumKxQ((AbstractHllArray)(gadget.hllSketchImpl));
}
return gadget.copyAs(tgtHllType);
}
@Override
public TgtHllType getTgtHllType() {
return TgtHllType.HLL_8;
}
@Override
public int getUpdatableSerializationBytes() {
return gadget.getUpdatableSerializationBytes();
}
@Override
public double getUpperBound(final int numStdDev) {
if (gadget.hllSketchImpl.isRebuildCurMinNumKxQFlag()) {
rebuildCurMinNumKxQ((AbstractHllArray)(gadget.hllSketchImpl));
}
return gadget.getUpperBound(numStdDev);
}
@Override
public boolean isCompact() {
return gadget.isCompact();
}
@Override
public boolean isEmpty() {
return gadget.isEmpty();
}
@Override
public boolean isMemory() {
return gadget.isMemory();
}
@Override
public boolean isOffHeap() {
return gadget.isOffHeap();
}
@Override
boolean isOutOfOrderFlag() {
return gadget.isOutOfOrderFlag();
}
@Override
public boolean isSameResource(final Memory mem) {
return gadget.isSameResource(mem);
}
boolean isRebuildCurMinNumKxQFlag() {
return gadget.hllSketchImpl.isRebuildCurMinNumKxQFlag();
}
void putRebuildCurMinNumKxQFlag(final boolean rebuild) {
gadget.hllSketchImpl.putRebuildCurMinNumKxQFlag(rebuild);
}
/**
* Resets to empty and retains the current lgK, but does not change the configured value of
* lgMaxK.
*/
@Override
public void reset() {
gadget.reset();
}
/**
* Gets the serialization of this union operator as a byte array in compact form, which is
* designed to be heapified only. It is not directly updatable.
* For the Union operator, this is the serialization of the internal state of
* the union operator as a sketch.
* @return the serialization of this union operator as a byte array.
*/
@Override
public byte[] toCompactByteArray() {
if (gadget.hllSketchImpl.isRebuildCurMinNumKxQFlag()) {
rebuildCurMinNumKxQ((AbstractHllArray)(gadget.hllSketchImpl));
}
return gadget.toCompactByteArray();
}
@Override
public byte[] toUpdatableByteArray() {
if (gadget.hllSketchImpl.isRebuildCurMinNumKxQFlag()) {
rebuildCurMinNumKxQ((AbstractHllArray)(gadget.hllSketchImpl));
}
return gadget.toUpdatableByteArray();
}
@Override
public String toString(final boolean summary, final boolean hllDetail,
final boolean auxDetail, final boolean all) {
return gadget.toString(summary, hllDetail, auxDetail, all);
}
/**
* Update this union operator with the given sketch.
* @param sketch the given sketch.
*/
public void update(final HllSketch sketch) {
//This should not happen, this is in case it does.
if (sketch.hllSketchImpl.isRebuildCurMinNumKxQFlag()) {
throw new SketchesArgumentException
("Incomming sketch is corrupted, Rebuild_CurMin_Num_KxQ flag is set.");
}
gadget.hllSketchImpl = unionImpl(sketch, gadget, lgMaxK);
}
@Override
void couponUpdate(final int coupon) {
if (coupon == EMPTY) { return; }
gadget.hllSketchImpl = gadget.hllSketchImpl.couponUpdate(coupon);
}
// Union operator logic
/**
* Union the given source and destination sketches. This static method examines the state of
* the current internal gadget and the incoming sketch and determines the optimum way to
* perform the union. This may involve swapping, downsampling, transforming, and / or
* copying one of the arguments and may completely replace the internals of the union.
*
* <p>If the union gadget is empty, the source sketch is effectively copied to the union gadget
* after any required transformations.
*
* <p>The direction of the update is required if the union gadget is in LIST or SET mode, and the
* source sketch is in HLL mode. This is done to maintain maximum accuracy of the union process.
*
* <p>The source sketch is downsampled if the source LgK is larger than maxLgK and in HLL mode.
*
* <p>The union gadget is downsampled if both source and union gadget are in HLL mode
* and the source LgK <b>less than</b> the union gadget LgK.
*
* @param source the given incoming sketch, which is not be modified.
* @param gadget the given gadget sketch, which has a target of HLL_8 and may be modified.
* @param lgMaxK the maximum value of log2 K for this union.
* @return the union of the two sketches in the form of the internal HllSketchImpl, which is
* always in HLL_8 form.
*/
private static HllSketchImpl unionImpl(final HllSketch source, final HllSketch gadget,
final int lgMaxK) {
assert gadget.getTgtHllType() == HLL_8;
if ((source == null) || source.isEmpty()) { return gadget.hllSketchImpl; }
final int srcLgK = source.getLgConfigK();
final int gadgetLgK = gadget.getLgConfigK();
final boolean srcIsMem = source.isMemory();
final boolean gdtIsMem = gadget.isMemory();
//Source is either LIST or SET mode
if (source.getCurMode() == CurMode.LIST ) {
source.mergeTo(gadget);
gadget.putOutOfOrderFlag(gadget.isOutOfOrderFlag() | source.isOutOfOrderFlag());
return gadget.hllSketchImpl;
}
if (source.getCurMode() == CurMode.SET ) {
if (gadget.isEmpty() && (!srcIsMem) && (!gdtIsMem) && (srcLgK == gadgetLgK)) {
gadget.hllSketchImpl = source.copyAs(HLL_8).hllSketchImpl;
return gadget.hllSketchImpl;
}
source.mergeTo(gadget);
gadget.putOutOfOrderFlag(true);
return gadget.hllSketchImpl;
}
//Hereafter, the srcImpl is in HLL mode.
final int bit0 = gdtIsMem ? 1 : 0;
final int bits1_2 = (gadget.isEmpty() ? 3 : gadget.getCurMode().ordinal()) << 1;
final int bit3 = (srcLgK < gadgetLgK) ? 8 : 0;
final int bit4 = (srcLgK > lgMaxK) ? 16 : 0;
final int sw = bit4 | bit3 | bits1_2 | bit0;
HllSketchImpl hllSketchImpl = null; //never returned as null
switch (sw) {
case 0: //src <= max, src >= gdt, gdtLIST, gdtHeap, reverse merge, no downsample, ooof=OR
case 8: //src <= max, src < gdt, gdtLIST, gdtHeap, reverse merge, no downsample, ooof=OR
{
final HllSketch srcHll8Heap = source.copyAs(HLL_8); //copy src to Heap
gadget.mergeTo(srcHll8Heap); //merge gdt(Hll8,heap,list) -> src(Hll8,heap,hll), autofold
final HllSketch gdtTmp = srcHll8Heap; //replace gdt
gdtTmp.putOutOfOrderFlag(gdtTmp.isOutOfOrderFlag() | source.isOutOfOrderFlag());
hllSketchImpl = gdtTmp.hllSketchImpl;
break;
}
case 1: //src <= max, src >= gdt, gdtLIST, gdtMemory, reverse merge, no downsample, ooof=OR
case 9: //src <= max, src < gdt, gdtLIST, gdtMemory, reverse merge, no downsample, ooof=OR
{
final HllSketch srcHll8Heap = source.copyAs(HLL_8); //copy src to Heap
gadget.mergeTo(srcHll8Heap); //merge gdt(Hll8,mem,list) -> src(Hll8,heap,hll), autofold
final WritableMemory wmem = gadget.getWritableMemory(); //use the gdt wmem
final byte[] byteArr = srcHll8Heap.toUpdatableByteArray(); //serialize srcCopy
wmem.putByteArray(0, byteArr, 0, byteArr.length); //replace old data with new
final HllSketch gdtTmp = HllSketch.writableWrap(wmem); //wrap & replace gdt
gdtTmp.putOutOfOrderFlag(gdtTmp.isOutOfOrderFlag() | source.isOutOfOrderFlag());
hllSketchImpl = gdtTmp.hllSketchImpl;
break;
}
case 2: //src <= max, src >= gdt, gdtSET, gdtHeap, reverse merge, no downsample, ooof=True
case 10: //src <= max, src < gdt, gdtSET, gdtHeap, reverse merge, no downsample, ooof=True
{
final HllSketch srcHll8Heap = source.copyAs(HLL_8); //copy src to Heap
gadget.mergeTo(srcHll8Heap); //merge gdt(Hll8,heap,set) -> src(Hll8,heap,hll), autofold
final HllSketch gdtTmp = srcHll8Heap; //replace gdt
gdtTmp.putOutOfOrderFlag(true);
hllSketchImpl = gdtTmp.hllSketchImpl;
break;
}
case 3: //src <= max, src >= gdt, gdtSET, gdtMemory, reverse merge, no downsample, ooof=True
case 11: //src <= max, src < gdt, gdtSET, gdtMemory, reverse merge, no downsample, ooof=True
{
final HllSketch srcHll8Heap = source.copyAs(HLL_8); //copy src to Heap
gadget.mergeTo(srcHll8Heap); //merge gdt(Hll8,mem,set) -> src(Hll8,heap,hll), autofold
final WritableMemory wmem = gadget.getWritableMemory(); //use the gdt wmem
final byte[] byteArr = srcHll8Heap.toUpdatableByteArray(); //serialize srcCopy
wmem.putByteArray(0, byteArr, 0, byteArr.length); //replace old data with new
final HllSketch gdtTmp = HllSketch.writableWrap(wmem); //wrap & replace gdt
gdtTmp.putOutOfOrderFlag(true);
hllSketchImpl = gdtTmp.hllSketchImpl;
break;
}
case 4: //src <= max, src >= gdt, gdtHLL, gdtHeap, forward merge, no downsample, ooof=True
case 20: //src > max, src >= gdt, gdtHLL, gdtHeap, forward merge, no downsample, ooof=True
case 5: //src <= max, src >= gdt, gdtHLL, gdtMemory, forward merge, no downsample, ooof=True
case 21: //src > max, src >= gdt, gdtHLL, gdtMemory, forward merge, no downsample, ooof=True
{
//merge src(Hll4,6,8,heap/mem,Mode=HLL) -> gdt(Hll8,heap,Mode=HLL), may autofold
mergeHlltoHLLmode(source, gadget, srcLgK, gadgetLgK, srcIsMem, gdtIsMem);
gadget.putOutOfOrderFlag(true);
hllSketchImpl = gadget.hllSketchImpl;
break;
}
case 6: //src <= max, src >= gdt, gdtEmpty, gdtHeap, replace, no downsample, ooof=Src
case 14: //src <= max, src < gdt, gdtEmpty, gdtHeap, replace, no downsample, ooof=Src
{
final HllSketch srcHll8Heap = source.copyAs(HLL_8); //copy src to Heap
//ooof is already what source is.
hllSketchImpl = srcHll8Heap.hllSketchImpl;
break;
}
case 7: //src <= max, src >= gdt, gdtEmpty, gdtMemory, replace mem, no downsample, ooof=Src
case 15: //src <= max, src < gdt, gdtEmpty, gdtMemory, replace mem, no downsample, ooof=Src
{
final HllSketch srcHll8Heap = source.copyAs(HLL_8); //copy src to Heap
final WritableMemory wmem = gadget.getWritableMemory(); //use the gdt wmem
final byte[] byteArr = srcHll8Heap.toUpdatableByteArray(); //serialize srcCopy
wmem.putByteArray(0, byteArr, 0, byteArr.length); //replace old data with new
final HllSketch gdtTmp = HllSketch.writableWrap(wmem); //wrap & replace gdt
//ooof is already what source is.
hllSketchImpl = gdtTmp.hllSketchImpl;
break;
}
case 12: //src <= max, src < gdt, gdtHLL, gdtHeap, fwd merge/repl, downsample Gdt, ooof=True
{
final HllSketch gdtHll8Heap = downsample(gadget, srcLgK); //downsample gdt to srcLgK
//merge src(Hll4,6,8;heap/mem,Mode=HLL) -> gdt(Hll8,heap,hll)
mergeHlltoHLLmode(source, gdtHll8Heap, srcLgK, gadgetLgK, srcIsMem, false);
gdtHll8Heap.putOutOfOrderFlag(true);
hllSketchImpl = gdtHll8Heap.hllSketchImpl;
break;
}
case 13: //src <= max, src < gdt, gdtHLL, gdtMemory, fwd merge/repl mem, downsample Gdt, ooof=True
{
final HllSketch gdtHll8Heap = downsample(gadget, srcLgK); //downsample gdt to srcLgK
//merge src(Hll4,6,8;heap/mem;Mode=HLL) -> gdt(Hll8,heap,Mode=HLL)
mergeHlltoHLLmode(source, gdtHll8Heap, srcLgK, gadgetLgK, srcIsMem, false);
final WritableMemory wmem = gadget.getWritableMemory(); //use the gdt wmem
final byte[] byteArr = gdtHll8Heap.toUpdatableByteArray(); //serialize gdtCopy
wmem.putByteArray(0, byteArr, 0, byteArr.length); //replace old data with new
final HllSketch gdtTmp = HllSketch.writableWrap(wmem); //wrap & replace gdt
gdtTmp.putOutOfOrderFlag(true);
hllSketchImpl = gdtTmp.hllSketchImpl;
break;
}
case 16: //src > max, src >= gdt, gdtList, gdtHeap, reverse merge, downsample Src, ooof=OR
{
final HllSketch srcHll8Heap = downsample(source, lgMaxK); //downsample src to Max
gadget.mergeTo(srcHll8Heap); //merge gdt(Hll8,heap,list) -> src(Hll8,heap,hll)
final HllSketch gdtTmp = srcHll8Heap; //replace gdt
gdtTmp.putOutOfOrderFlag(gdtTmp.isOutOfOrderFlag() | source.isOutOfOrderFlag());
hllSketchImpl = gdtTmp.hllSketchImpl;
break;
}
case 17: //src > max, src >= gdt, gdtList, gdtMemory, rev merge/repl mem, downsample Src, ooof=OR
{
final HllSketch srcHll8Heap = downsample(source, lgMaxK); //downsample src to Max
gadget.mergeTo(srcHll8Heap); //merge gdt(Hll8,mem,list) -> src(Hll8,heap,hll)
final WritableMemory wmem = gadget.getWritableMemory(); //use the gdt wmem
final byte[] byteArr = srcHll8Heap.toUpdatableByteArray(); //serialize srcCopy
wmem.putByteArray(0, byteArr, 0, byteArr.length); //replace old data with new
final HllSketch gdtTmp = HllSketch.writableWrap(wmem); //wrap & replace gdt
gdtTmp.putOutOfOrderFlag(gdtTmp.isOutOfOrderFlag() | source.isOutOfOrderFlag());
hllSketchImpl = gdtTmp.hllSketchImpl;
break;
}
case 18: //src > max, src >= gdt, gdtSet, gdtHeap, rev merge, downsample Src, ooof=True
{
final HllSketch srcHll8Heap = downsample(source, lgMaxK); //downsample src to Max
gadget.mergeTo(srcHll8Heap); //merge gdt(Hll8,heap,set) -> src(Hll8,heap,hll)
final HllSketch gdtTmp = srcHll8Heap; //replace gdt
gdtTmp.putOutOfOrderFlag(true);
hllSketchImpl = gdtTmp.hllSketchImpl;
break;
}
case 19: //src > max, src >= gdt, gdtSet, gdtMemory, rev merge/rep mem, downsample Src, ooof=True
{
final HllSketch srcHll8Heap = downsample(source, lgMaxK); //downsample src to Max
gadget.mergeTo(srcHll8Heap); //merge gdt(Hll8,mem,set) -> src(Hll8,heap,hll)
final WritableMemory wmem = gadget.getWritableMemory(); //use the gdt wmem
final byte[] byteArr = srcHll8Heap.toUpdatableByteArray(); //serialize srcCopy
wmem.putByteArray(0, byteArr, 0, byteArr.length); //replace old data with new
final HllSketch gdtTmp = HllSketch.writableWrap(wmem); //wrap & replace gdt
gdtTmp.putOutOfOrderFlag(true);
hllSketchImpl = gdtTmp.hllSketchImpl;
break;
}
case 22: //src > max, src >= gdt, gdtEmpty, gdtHeap, replace, downsample Src, ooof=Src
{
final HllSketch srcHll8Heap = downsample(source, lgMaxK); //downsample src to Max
hllSketchImpl = srcHll8Heap.hllSketchImpl; //replace gdt
//ooof is already what source is.
break;
}
case 23: //src > max, src >= gdt, gdtEmpty, gdtMemory, replace mem, downsample Src, ooof=Src
{
final HllSketch srcHll8Heap = downsample(source, lgMaxK); //downsample src to Max
final WritableMemory wmem = gadget.getWritableMemory(); //use the gdt wmem
final byte[] byteArr = srcHll8Heap.toUpdatableByteArray(); //serialize srcCopy
wmem.putByteArray(0, byteArr, 0, byteArr.length); //replace old data with new
hllSketchImpl = HllSketch.writableWrap(wmem).hllSketchImpl;//wrap & replace gdt
//ooof is already what source is.
break;
}
//default: return gadget.hllSketchImpl; //not possible
}
return hllSketchImpl;
}
private static void mergeHlltoHLLmode(final HllSketch src, final HllSketch tgt,
final int srcLgK, final int tgtLgK, final boolean srcIsMem, final boolean tgtIsMem) {
final int sw = (tgtIsMem ? 1 : 0) | (srcIsMem ? 2 : 0)
| ((srcLgK > tgtLgK) ? 4 : 0) | ((src.getTgtHllType() != HLL_8) ? 8 : 0);
switch (sw) {
case 0: { //HLL_8, srcLgK=tgtLgK, src=heap, tgt=heap
final int srcK = 1 << srcLgK;
final byte[] srcArr = ((Hll8Array) src.hllSketchImpl).hllByteArr;
final byte[] tgtArr = ((Hll8Array) tgt.hllSketchImpl).hllByteArr;
for (int i = 0; i < srcK; i++) {
final byte srcV = srcArr[i];
final byte tgtV = tgtArr[i];
tgtArr[i] = (srcV > tgtV) ? srcV : tgtV;
}
break;
}
case 1: { //HLL_8, srcLgK=tgtLgK, src=heap, tgt=mem
final int srcK = 1 << srcLgK;
final byte[] srcArr = ((Hll8Array) src.hllSketchImpl).hllByteArr;
final WritableMemory tgtMem = tgt.getWritableMemory();
for (int i = 0; i < srcK; i++) {
final byte srcV = srcArr[i];
final byte tgtV = tgtMem.getByte(HLL_BYTE_ARR_START + i);
tgtMem.putByte(HLL_BYTE_ARR_START + i, (srcV > tgtV) ? srcV : tgtV);
}
break;
}
case 2: { //HLL_8, srcLgK=tgtLgK, src=mem, tgt=heap
final int srcK = 1 << srcLgK;
final Memory srcMem = src.getMemory();
final byte[] tgtArr = ((Hll8Array) tgt.hllSketchImpl).hllByteArr;
for (int i = 0; i < srcK; i++) {
final byte srcV = srcMem.getByte(HLL_BYTE_ARR_START + i);
final byte tgtV = tgtArr[i];
tgtArr[i] = (srcV > tgtV) ? srcV : tgtV;
}
break;
}
case 3: { //HLL_8, srcLgK=tgtLgK, src=mem, tgt=mem
final int srcK = 1 << srcLgK;
final Memory srcMem = src.getMemory();
final WritableMemory tgtMem = tgt.getWritableMemory();
for (int i = 0; i < srcK; i++) {
final byte srcV = srcMem.getByte(HLL_BYTE_ARR_START + i);
final byte tgtV = tgtMem.getByte(HLL_BYTE_ARR_START + i);
tgtMem.putByte(HLL_BYTE_ARR_START + i, (srcV > tgtV) ? srcV : tgtV);
}
break;
}
case 4: { //HLL_8, srcLgK>tgtLgK, src=heap, tgt=heap
final int srcK = 1 << srcLgK;
final int tgtKmask = (1 << tgtLgK) - 1;
final byte[] srcArr = ((Hll8Array) src.hllSketchImpl).hllByteArr;
final byte[] tgtArr = ((Hll8Array) tgt.hllSketchImpl).hllByteArr;
for (int i = 0; i < srcK; i++) {
final byte srcV = srcArr[i];
final int j = i & tgtKmask;
final byte tgtV = tgtArr[j];
tgtArr[j] = (srcV > tgtV) ? srcV : tgtV;
}
break;
}
case 5: { //HLL_8, srcLgK>tgtLgK, src=heap, tgt=mem
final int srcK = 1 << srcLgK;
final int tgtKmask = (1 << tgtLgK) - 1;
final byte[] srcArr = ((Hll8Array) src.hllSketchImpl).hllByteArr;
final WritableMemory tgtMem = tgt.getWritableMemory();
for (int i = 0; i < srcK; i++) {
final byte srcV = srcArr[i];
final int j = i & tgtKmask;
final byte tgtV = tgtMem.getByte(HLL_BYTE_ARR_START + j);
tgtMem.putByte(HLL_BYTE_ARR_START + j, (srcV > tgtV) ? srcV : tgtV);
}
break;
}
case 6: { //HLL_8, srcLgK>tgtLgK, src=mem, tgt=heap
final int srcK = 1 << srcLgK;
final int tgtKmask = (1 << tgtLgK) - 1;
final Memory srcMem = src.getMemory();
final byte[] tgtArr = ((Hll8Array) tgt.hllSketchImpl).hllByteArr;
for (int i = 0; i < srcK; i++) {
final byte srcV = srcMem.getByte(HLL_BYTE_ARR_START + i);
final int j = i & tgtKmask;
final byte tgtV = tgtArr[j];
tgtArr[j] = (srcV > tgtV) ? srcV : tgtV;
}
break;
}
case 7: { //HLL_8, srcLgK>tgtLgK, src=mem, tgt=mem
final int srcK = 1 << srcLgK;
final int tgtKmask = (1 << tgtLgK) - 1;
final Memory srcMem = src.getMemory();
final WritableMemory tgtMem = tgt.getWritableMemory();
for (int i = 0; i < srcK; i++) {
final byte srcV = srcMem.getByte(HLL_BYTE_ARR_START + i);
final int j = i & tgtKmask;
final byte tgtV = tgtMem.getByte(HLL_BYTE_ARR_START + j);
tgtMem.putByte(HLL_BYTE_ARR_START + j, (srcV > tgtV) ? srcV : tgtV);
}
break;
}
case 8: case 9: case 10: case 11:
{
//!HLL_8, srcLgK=tgtLgK, src=heap/mem, tgt=heap/mem
final int srcK = 1 << srcLgK;
final AbstractHllArray srcAbsHllArr = (AbstractHllArray)(src.hllSketchImpl);
final AbstractHllArray tgtAbsHllArr = (AbstractHllArray)(tgt.hllSketchImpl);
for (int i = 0; i < srcK; i++) {
final int srcV = srcAbsHllArr.getSlotValue(i);
tgtAbsHllArr.updateSlotNoKxQ(i, srcV);
}
break;
}
case 12: case 13: case 14: case 15:
{
//!HLL_8, srcLgK>tgtLgK, src=heap/mem, tgt=heap/mem
final int srcK = 1 << srcLgK;
final int tgtKmask = (1 << tgtLgK) - 1;
final AbstractHllArray srcAbsHllArr = (AbstractHllArray)(src.hllSketchImpl);
final AbstractHllArray tgtAbsHllArr = (AbstractHllArray)(tgt.hllSketchImpl);
for (int i = 0; i < srcK; i++) {
final int srcV = srcAbsHllArr.getSlotValue(i);
final int j = i & tgtKmask;
tgtAbsHllArr.updateSlotNoKxQ(j, srcV);
}
break;
}
}
tgt.hllSketchImpl.putRebuildCurMinNumKxQFlag(true);
}
//Used by union operator. Always copies or downsamples to Heap HLL_8.
//Caller must ultimately manage oooFlag, as caller has more context.
/**
* Copies or downsamples the given candidate HLLmode sketch to tgtLgK, HLL_8, on the heap.
*
* @param candidate the HllSketch to downsample, must be in HLL mode.
* @param tgtLgK the LgK to downsample to.
* @return the downsampled HllSketch.
*/
private static final HllSketch downsample(final HllSketch candidate, final int tgtLgK) {
final AbstractHllArray candArr = (AbstractHllArray) candidate.hllSketchImpl;
final HllArray tgtHllArr = HllArray.newHeapHll(tgtLgK, TgtHllType.HLL_8);
final PairIterator candItr = candArr.iterator();
while (candItr.nextValid()) {
tgtHllArr.couponUpdate(candItr.getPair()); //rebuilds KxQ, etc.
}
//both of these are required for isomorphism
tgtHllArr.putHipAccum(candArr.getHipAccum());
tgtHllArr.putOutOfOrderFlag(candidate.isOutOfOrderFlag());
tgtHllArr.putRebuildCurMinNumKxQFlag(false);
return new HllSketch(tgtHllArr);
}
//Used to rebuild curMin, numAtCurMin and KxQ registers, due to high performance merge operation
//performed in 1st switch cases 4, 5, 12, 13, 20, 21
private static final void rebuildCurMinNumKxQ(final AbstractHllArray absHllArr) {
int curMin = 64;
int numAtCurMin = 0;
double kxq0 = 1 << absHllArr.getLgConfigK();
double kxq1 = 0;
final PairIterator itr = absHllArr.iterator();
while (itr.nextAll()) {
final int v = itr.getValue();
if (v > 0) {
if (v < 32) { kxq0 += invPow2(v) - 1.0; }
else { kxq1 += invPow2(v) - 1.0; }
}
if (v > curMin) { continue; }
if (v < curMin) {
curMin = v;
numAtCurMin = 1;
} else {
numAtCurMin++;
}
}
absHllArr.putKxQ0(kxq0);
absHllArr.putKxQ1(kxq1);
absHllArr.putCurMin(curMin);
absHllArr.putNumAtCurMin(numAtCurMin);
absHllArr.putRebuildCurMinNumKxQFlag(false);
//HipAccum is not affected
}
}