| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.hll; |
| |
| import com.fasterxml.jackson.annotation.JsonValue; |
| import com.google.common.primitives.UnsignedBytes; |
| import org.apache.druid.java.util.common.IAE; |
| import org.apache.druid.java.util.common.ISE; |
| |
| import javax.annotation.Nullable; |
| import java.nio.ByteBuffer; |
| |
| /** |
| * Implements the HyperLogLog cardinality estimator described in: |
| * |
| * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf |
| * |
| * Run this code to see a simple indication of expected errors based on different m values: |
| * |
| * <code> |
| * for (int i = 1; i < 20; ++i) { |
| * System.out.printf("i[%,d], val[%,d] => error[%f%%]%n", i, 2 << i, 104 / Math.sqrt(2 << i)); |
| * } |
| * </code> |
| * |
| * This class is *not* multi-threaded. It can be passed among threads, but it is written with the assumption that |
| * only one thread is ever calling methods on it. |
| * |
| * If you have multiple threads calling methods on this concurrently, I hope you manage to get correct behavior. |
| * |
| * Note that despite the non-thread-safety of this class, it is actually currently used by multiple threads during |
| * realtime indexing. HyperUniquesAggregator's "aggregate" and "get" methods can be called simultaneously by |
| * OnheapIncrementalIndex, since its "doAggregate" and "getMetricObjectValue" methods are not synchronized. So, watch |
| * out for that. |
| */ |
| public abstract class HyperLogLogCollector implements Comparable<HyperLogLogCollector> |
| { |
| public static final int DENSE_THRESHOLD = 128; |
| public static final int BITS_FOR_BUCKETS = 11; |
| public static final int NUM_BUCKETS = 1 << BITS_FOR_BUCKETS; |
| public static final int NUM_BYTES_FOR_BUCKETS = NUM_BUCKETS / 2; |
| |
| private static final double TWO_TO_THE_SIXTY_FOUR = Math.pow(2, 64); |
| private static final double ALPHA = 0.7213 / (1 + 1.079 / NUM_BUCKETS); |
| |
| public static final double LOW_CORRECTION_THRESHOLD = (5 * NUM_BUCKETS) / 2.0d; |
| public static final double HIGH_CORRECTION_THRESHOLD = TWO_TO_THE_SIXTY_FOUR / 30.0d; |
| public static final double CORRECTION_PARAMETER = ALPHA * NUM_BUCKETS * NUM_BUCKETS; |
| |
| private static final int BUCKET_MASK = 0x7ff; |
| private static final int MIN_BYTES_REQUIRED = 10; |
| private static final int BITS_PER_BUCKET = 4; |
| private static final int RANGE = (int) Math.pow(2, BITS_PER_BUCKET) - 1; |
| |
| private static final double[][] MIN_NUM_REGISTER_LOOKUP = new double[64][256]; |
| |
| static { |
| for (int registerOffset = 0; registerOffset < 64; ++registerOffset) { |
| for (int register = 0; register < 256; ++register) { |
| final int upper = ((register & 0xf0) >> 4) + registerOffset; |
| final int lower = (register & 0x0f) + registerOffset; |
| MIN_NUM_REGISTER_LOOKUP[registerOffset][register] = 1.0d / Math.pow(2, upper) + 1.0d / Math.pow(2, lower); |
| } |
| } |
| } |
| |
| // we have to keep track of the number of zeroes in each of the two halves of the byte register (0, 1, or 2) |
| private static final int[] NUM_ZERO_LOOKUP = new int[256]; |
| |
| static { |
| for (int i = 0; i < NUM_ZERO_LOOKUP.length; ++i) { |
| NUM_ZERO_LOOKUP[i] = (((i & 0xf0) == 0) ? 1 : 0) + (((i & 0x0f) == 0) ? 1 : 0); |
| } |
| } |
| |
| // Methods to build the latest HLLC |
| public static HyperLogLogCollector makeLatestCollector() |
| { |
| return new VersionOneHyperLogLogCollector(); |
| } |
| |
| /** |
| * Create a wrapper object around an HLL sketch contained within a buffer. The position and limit of |
| * the buffer may be changed; if you do not want this to happen, you can duplicate the buffer before |
| * passing it in. |
| * |
| * The mark and byte order of the buffer will not be modified. |
| * |
| * @param buffer buffer containing an HLL sketch starting at its position and ending at its limit |
| * |
| * @return HLLC wrapper object |
| */ |
| public static HyperLogLogCollector makeCollector(ByteBuffer buffer) |
| { |
| int remaining = buffer.remaining(); |
| if (remaining % 3 == 0 || remaining == 1027) { |
| return new VersionZeroHyperLogLogCollector(buffer); |
| } else { |
| return new VersionOneHyperLogLogCollector(buffer); |
| } |
| } |
| |
| /** |
| * Creates new collector which shares others collector buffer (by using {@link ByteBuffer#duplicate()}) |
| * |
| * @param otherCollector collector which buffer will be shared |
| * @return collector |
| */ |
| public static HyperLogLogCollector makeCollectorSharingStorage(HyperLogLogCollector otherCollector) |
| { |
| return makeCollector(otherCollector.getStorageBuffer().duplicate()); |
| } |
| |
| public static int getLatestNumBytesForDenseStorage() |
| { |
| return VersionOneHyperLogLogCollector.NUM_BYTES_FOR_DENSE_STORAGE; |
| } |
| |
| public static byte[] makeEmptyVersionedByteArray() |
| { |
| byte[] arr = new byte[getLatestNumBytesForDenseStorage()]; |
| arr[0] = VersionOneHyperLogLogCollector.VERSION; |
| return arr; |
| } |
| |
| public static double applyCorrection(double e, int zeroCount) |
| { |
| e = CORRECTION_PARAMETER / e; |
| |
| if (e <= LOW_CORRECTION_THRESHOLD) { |
| return zeroCount == 0 ? e : NUM_BUCKETS * Math.log(NUM_BUCKETS / (double) zeroCount); |
| } |
| |
| if (e > HIGH_CORRECTION_THRESHOLD) { |
| final double ratio = e / TWO_TO_THE_SIXTY_FOUR; |
| if (ratio >= 1) { |
| // handle very unlikely case that value is > 2^64 |
| return Double.POSITIVE_INFINITY; |
| } else { |
| return -TWO_TO_THE_SIXTY_FOUR * Math.log(1 - ratio); |
| } |
| } |
| |
| return e; |
| } |
| |
| public static double estimateByteBuffer(ByteBuffer buf) |
| { |
| return makeCollector(buf.duplicate()).estimateCardinality(); |
| } |
| |
| private static double estimateSparse( |
| final ByteBuffer buf, |
| final byte minNum, |
| final byte overflowValue, |
| final short overflowPosition, |
| final boolean isUpperNibble |
| ) |
| { |
| final ByteBuffer copy = buf.asReadOnlyBuffer(); |
| double e = 0.0d; |
| int zeroCount = NUM_BUCKETS - 2 * (buf.remaining() / 3); |
| while (copy.hasRemaining()) { |
| short position = copy.getShort(); |
| final int register = (int) copy.get() & 0xff; |
| if (overflowValue != 0 && position == overflowPosition) { |
| int upperNibble = ((register & 0xf0) >>> BITS_PER_BUCKET) + minNum; |
| int lowerNibble = (register & 0x0f) + minNum; |
| if (isUpperNibble) { |
| upperNibble = Math.max(upperNibble, overflowValue); |
| } else { |
| lowerNibble = Math.max(lowerNibble, overflowValue); |
| } |
| e += 1.0d / Math.pow(2, upperNibble) + 1.0d / Math.pow(2, lowerNibble); |
| zeroCount += (((upperNibble & 0xf0) == 0) ? 1 : 0) + (((lowerNibble & 0x0f) == 0) ? 1 : 0); |
| } else { |
| e += MIN_NUM_REGISTER_LOOKUP[minNum][register]; |
| zeroCount += NUM_ZERO_LOOKUP[register]; |
| } |
| } |
| |
| e += zeroCount; |
| return applyCorrection(e, zeroCount); |
| } |
| |
| private static double estimateDense( |
| final ByteBuffer buf, |
| final byte minNum, |
| final byte overflowValue, |
| final short overflowPosition, |
| final boolean isUpperNibble |
| ) |
| { |
| final ByteBuffer copy = buf.asReadOnlyBuffer(); |
| double e = 0.0d; |
| int zeroCount = 0; |
| int position = 0; |
| while (copy.hasRemaining()) { |
| final int register = (int) copy.get() & 0xff; |
| if (overflowValue != 0 && position == overflowPosition) { |
| int upperNibble = ((register & 0xf0) >>> BITS_PER_BUCKET) + minNum; |
| int lowerNibble = (register & 0x0f) + minNum; |
| if (isUpperNibble) { |
| upperNibble = Math.max(upperNibble, overflowValue); |
| } else { |
| lowerNibble = Math.max(lowerNibble, overflowValue); |
| } |
| e += 1.0d / Math.pow(2, upperNibble) + 1.0d / Math.pow(2, lowerNibble); |
| zeroCount += (((upperNibble & 0xf0) == 0) ? 1 : 0) + (((lowerNibble & 0x0f) == 0) ? 1 : 0); |
| } else { |
| e += MIN_NUM_REGISTER_LOOKUP[minNum][register]; |
| zeroCount += NUM_ZERO_LOOKUP[register]; |
| } |
| position++; |
| } |
| |
| return applyCorrection(e, zeroCount); |
| } |
| |
| /** |
| * Checks if the payload for the given ByteBuffer is sparse or not. |
| * The given buffer must be positioned at getPayloadBytePosition() prior to calling isSparse |
| */ |
| private static boolean isSparse(ByteBuffer buffer) |
| { |
| return buffer.remaining() != NUM_BYTES_FOR_BUCKETS; |
| } |
| |
| private ByteBuffer storageBuffer; |
| private int initPosition; |
| private Double estimatedCardinality; |
| |
| public HyperLogLogCollector(ByteBuffer byteBuffer) |
| { |
| storageBuffer = byteBuffer; |
| initPosition = byteBuffer.position(); |
| estimatedCardinality = null; |
| } |
| |
| public abstract byte getVersion(); |
| |
| public abstract void setVersion(ByteBuffer buffer); |
| |
| public abstract byte getRegisterOffset(); |
| |
| public abstract void setRegisterOffset(byte registerOffset); |
| |
| public abstract void setRegisterOffset(ByteBuffer buffer, byte registerOffset); |
| |
| public abstract short getNumNonZeroRegisters(); |
| |
| public abstract void setNumNonZeroRegisters(short numNonZeroRegisters); |
| |
| public abstract void setNumNonZeroRegisters(ByteBuffer buffer, short numNonZeroRegisters); |
| |
| public abstract byte getMaxOverflowValue(); |
| |
| public abstract void setMaxOverflowValue(byte value); |
| |
| public abstract void setMaxOverflowValue(ByteBuffer buffer, byte value); |
| |
| public abstract short getMaxOverflowRegister(); |
| |
| public abstract void setMaxOverflowRegister(short register); |
| |
| public abstract void setMaxOverflowRegister(ByteBuffer buffer, short register); |
| |
| public abstract int getNumHeaderBytes(); |
| |
| public abstract int getNumBytesForDenseStorage(); |
| |
| public abstract int getPayloadBytePosition(); |
| |
| public abstract int getPayloadBytePosition(ByteBuffer buffer); |
| |
| protected int getInitPosition() |
| { |
| return initPosition; |
| } |
| |
| protected ByteBuffer getStorageBuffer() |
| { |
| return storageBuffer; |
| } |
| |
| public void add(byte[] hashedValue) |
| { |
| if (hashedValue.length < MIN_BYTES_REQUIRED) { |
| throw new IAE("Insufficient bytes, need[%d] got [%d]", MIN_BYTES_REQUIRED, hashedValue.length); |
| } |
| |
| estimatedCardinality = null; |
| |
| final ByteBuffer buffer = ByteBuffer.wrap(hashedValue); |
| |
| short bucket = (short) (buffer.getShort(hashedValue.length - 2) & BUCKET_MASK); |
| |
| byte positionOf1 = 0; |
| |
| for (int i = 0; i < 8; ++i) { |
| byte lookupVal = ByteBitLookup.LOOKUP[UnsignedBytes.toInt(hashedValue[i])]; |
| switch (lookupVal) { |
| case 0: |
| positionOf1 += (byte) 8; |
| continue; |
| default: |
| positionOf1 += lookupVal; |
| i = 8; |
| break; |
| } |
| } |
| |
| add(bucket, positionOf1); |
| } |
| |
| public void add(short bucket, byte positionOf1) |
| { |
| if (storageBuffer.isReadOnly()) { |
| convertToMutableByteBuffer(); |
| } |
| |
| byte registerOffset = getRegisterOffset(); |
| |
| // discard everything outside of the range we care about |
| if (positionOf1 <= registerOffset) { |
| return; |
| } else if (positionOf1 > (registerOffset + RANGE)) { |
| final byte currMax = getMaxOverflowValue(); |
| if (positionOf1 > currMax) { |
| if (currMax <= (registerOffset + RANGE)) { |
| // this could be optimized by having an add without sanity checks |
| add(getMaxOverflowRegister(), currMax); |
| } |
| setMaxOverflowValue(positionOf1); |
| setMaxOverflowRegister(bucket); |
| } |
| return; |
| } |
| |
| // whatever value we add must be stored in 4 bits |
| short numNonZeroRegisters = addNibbleRegister(bucket, (byte) ((0xff & positionOf1) - registerOffset)); |
| setNumNonZeroRegisters(numNonZeroRegisters); |
| if (numNonZeroRegisters == NUM_BUCKETS) { |
| setRegisterOffset((byte) (registerOffset + 1)); |
| setNumNonZeroRegisters(decrementBuckets()); |
| } |
| } |
| |
| public HyperLogLogCollector fold(@Nullable HyperLogLogCollector other) |
| { |
| if (other == null || other.storageBuffer.remaining() == 0) { |
| return this; |
| } |
| |
| if (storageBuffer.isReadOnly()) { |
| convertToMutableByteBuffer(); |
| } |
| |
| if (storageBuffer.remaining() != getNumBytesForDenseStorage()) { |
| convertToDenseStorage(); |
| } |
| |
| estimatedCardinality = null; |
| |
| if (getRegisterOffset() < other.getRegisterOffset()) { |
| // "Swap" the buffers so that we are folding into the one with the higher offset |
| final ByteBuffer tmpBuffer = ByteBuffer.allocate(storageBuffer.remaining()); |
| tmpBuffer.put(storageBuffer.asReadOnlyBuffer()); |
| tmpBuffer.clear(); |
| |
| storageBuffer.duplicate().put(other.storageBuffer.asReadOnlyBuffer()); |
| |
| if (other.storageBuffer.remaining() != other.getNumBytesForDenseStorage()) { |
| // The other buffer was sparse, densify it |
| final int newLImit = storageBuffer.position() + other.storageBuffer.remaining(); |
| storageBuffer.limit(newLImit); |
| convertToDenseStorage(); |
| } |
| |
| other = HyperLogLogCollector.makeCollector(tmpBuffer); |
| } |
| |
| final ByteBuffer otherBuffer = other.storageBuffer; |
| |
| // Save position and restore later to avoid allocations due to duplicating the otherBuffer object. |
| final int otherPosition = otherBuffer.position(); |
| |
| try { |
| final byte otherOffset = other.getRegisterOffset(); |
| |
| byte myOffset = getRegisterOffset(); |
| short numNonZero = getNumNonZeroRegisters(); |
| |
| final int offsetDiff = myOffset - otherOffset; |
| if (offsetDiff < 0) { |
| throw new ISE("offsetDiff[%d] < 0, shouldn't happen because of swap.", offsetDiff); |
| } |
| |
| final int myPayloadStart = getPayloadBytePosition(); |
| otherBuffer.position(other.getPayloadBytePosition()); |
| |
| if (isSparse(otherBuffer)) { |
| while (otherBuffer.hasRemaining()) { |
| final int payloadStartPosition = otherBuffer.getShort() - other.getNumHeaderBytes(); |
| numNonZero += mergeAndStoreByteRegister( |
| storageBuffer, |
| myPayloadStart + payloadStartPosition, |
| offsetDiff, |
| otherBuffer.get() |
| ); |
| } |
| if (numNonZero == NUM_BUCKETS) { |
| numNonZero = decrementBuckets(); |
| setRegisterOffset((byte) (myOffset + 1)); |
| setNumNonZeroRegisters(numNonZero); |
| } |
| } else { // dense |
| int position = getPayloadBytePosition(); |
| while (otherBuffer.hasRemaining()) { |
| numNonZero += mergeAndStoreByteRegister( |
| storageBuffer, |
| position, |
| offsetDiff, |
| otherBuffer.get() |
| ); |
| position++; |
| } |
| if (numNonZero == NUM_BUCKETS) { |
| numNonZero = decrementBuckets(); |
| setRegisterOffset((byte) (myOffset + 1)); |
| setNumNonZeroRegisters(numNonZero); |
| } |
| } |
| |
| // no need to call setRegisterOffset(myOffset) here, since it gets updated every time myOffset is incremented |
| setNumNonZeroRegisters(numNonZero); |
| |
| // this will add the max overflow and also recheck if offset needs to be shifted |
| add(other.getMaxOverflowRegister(), other.getMaxOverflowValue()); |
| |
| return this; |
| } |
| finally { |
| otherBuffer.position(otherPosition); |
| } |
| } |
| |
| public HyperLogLogCollector fold(ByteBuffer buffer) |
| { |
| return fold(makeCollector(buffer.duplicate())); |
| } |
| |
| public ByteBuffer toByteBuffer() |
| { |
| |
| final short numNonZeroRegisters = getNumNonZeroRegisters(); |
| |
| // store sparsely |
| if (storageBuffer.remaining() == getNumBytesForDenseStorage() && numNonZeroRegisters < DENSE_THRESHOLD) { |
| final ByteBuffer retVal = ByteBuffer.wrap(new byte[numNonZeroRegisters * 3 + getNumHeaderBytes()]); |
| setVersion(retVal); |
| setRegisterOffset(retVal, getRegisterOffset()); |
| setNumNonZeroRegisters(retVal, numNonZeroRegisters); |
| setMaxOverflowValue(retVal, getMaxOverflowValue()); |
| setMaxOverflowRegister(retVal, getMaxOverflowRegister()); |
| |
| final int startPosition = getPayloadBytePosition(); |
| retVal.position(getPayloadBytePosition(retVal)); |
| |
| final byte[] zipperBuffer = new byte[NUM_BYTES_FOR_BUCKETS]; |
| ByteBuffer roStorageBuffer = storageBuffer.asReadOnlyBuffer(); |
| roStorageBuffer.position(startPosition); |
| roStorageBuffer.get(zipperBuffer); |
| |
| for (int i = 0; i < NUM_BYTES_FOR_BUCKETS; ++i) { |
| if (zipperBuffer[i] != 0) { |
| final short val = (short) (0xffff & (i + startPosition - initPosition)); |
| retVal.putShort(val); |
| retVal.put(zipperBuffer[i]); |
| } |
| } |
| retVal.rewind(); |
| return retVal.asReadOnlyBuffer(); |
| } |
| |
| return storageBuffer.asReadOnlyBuffer(); |
| } |
| |
| @JsonValue |
| public byte[] toByteArray() |
| { |
| final ByteBuffer buffer = toByteBuffer(); |
| byte[] theBytes = new byte[buffer.remaining()]; |
| buffer.get(theBytes); |
| |
| return theBytes; |
| } |
| |
| public long estimateCardinalityRound() |
| { |
| return Math.round(estimateCardinality()); |
| } |
| |
| public double estimateCardinality() |
| { |
| if (estimatedCardinality == null) { |
| byte registerOffset = getRegisterOffset(); |
| byte overflowValue = getMaxOverflowValue(); |
| short overflowRegister = getMaxOverflowRegister(); |
| short overflowPosition = (short) (overflowRegister >>> 1); |
| boolean isUpperNibble = ((overflowRegister & 0x1) == 0); |
| |
| storageBuffer.position(getPayloadBytePosition()); |
| |
| if (isSparse(storageBuffer)) { |
| estimatedCardinality = estimateSparse( |
| storageBuffer, |
| registerOffset, |
| overflowValue, |
| overflowPosition, |
| isUpperNibble |
| ); |
| } else { |
| estimatedCardinality = estimateDense( |
| storageBuffer, |
| registerOffset, |
| overflowValue, |
| overflowPosition, |
| isUpperNibble |
| ); |
| } |
| |
| storageBuffer.position(initPosition); |
| } |
| return estimatedCardinality; |
| } |
| |
| @Override |
| public boolean equals(Object o) |
| { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| |
| ByteBuffer otherBuffer = ((HyperLogLogCollector) o).storageBuffer; |
| |
| if (storageBuffer != null ? false : otherBuffer != null) { |
| return false; |
| } |
| |
| if (storageBuffer == null && otherBuffer == null) { |
| return true; |
| } |
| |
| final ByteBuffer denseStorageBuffer; |
| if (storageBuffer.remaining() != getNumBytesForDenseStorage()) { |
| HyperLogLogCollector denseCollector = HyperLogLogCollector.makeCollector(storageBuffer.duplicate()); |
| denseCollector.convertToDenseStorage(); |
| denseStorageBuffer = denseCollector.storageBuffer; |
| } else { |
| denseStorageBuffer = storageBuffer; |
| } |
| |
| if (otherBuffer.remaining() != getNumBytesForDenseStorage()) { |
| HyperLogLogCollector otherCollector = HyperLogLogCollector.makeCollector(otherBuffer.duplicate()); |
| otherCollector.convertToDenseStorage(); |
| otherBuffer = otherCollector.storageBuffer; |
| } |
| |
| return denseStorageBuffer.equals(otherBuffer); |
| } |
| |
| @Override |
| public int hashCode() |
| { |
| int result = storageBuffer != null ? storageBuffer.hashCode() : 0; |
| result = 31 * result + initPosition; |
| return result; |
| } |
| |
| @Override |
| public String toString() |
| { |
| return "HyperLogLogCollector{" + |
| "initPosition=" + initPosition + |
| ", version=" + getVersion() + |
| ", registerOffset=" + getRegisterOffset() + |
| ", numNonZeroRegisters=" + getNumNonZeroRegisters() + |
| ", maxOverflowValue=" + getMaxOverflowValue() + |
| ", maxOverflowRegister=" + getMaxOverflowRegister() + |
| '}'; |
| } |
| |
| private short decrementBuckets() |
| { |
| final int startPosition = getPayloadBytePosition(); |
| short count = 0; |
| for (int i = startPosition; i < startPosition + NUM_BYTES_FOR_BUCKETS; i++) { |
| final byte val = (byte) (storageBuffer.get(i) - 0x11); |
| if ((val & 0xf0) != 0) { |
| ++count; |
| } |
| if ((val & 0x0f) != 0) { |
| ++count; |
| } |
| storageBuffer.put(i, val); |
| } |
| return count; |
| } |
| |
| private void convertToMutableByteBuffer() |
| { |
| ByteBuffer tmpBuffer = ByteBuffer.allocate(storageBuffer.remaining()); |
| tmpBuffer.put(storageBuffer.asReadOnlyBuffer()); |
| tmpBuffer.position(0); |
| storageBuffer = tmpBuffer; |
| initPosition = 0; |
| } |
| |
| private void convertToDenseStorage() |
| { |
| ByteBuffer tmpBuffer = ByteBuffer.allocate(getNumBytesForDenseStorage()); |
| // put header |
| setVersion(tmpBuffer); |
| setRegisterOffset(tmpBuffer, getRegisterOffset()); |
| setNumNonZeroRegisters(tmpBuffer, getNumNonZeroRegisters()); |
| setMaxOverflowValue(tmpBuffer, getMaxOverflowValue()); |
| setMaxOverflowRegister(tmpBuffer, getMaxOverflowRegister()); |
| |
| storageBuffer.position(getPayloadBytePosition()); |
| tmpBuffer.position(getPayloadBytePosition(tmpBuffer)); |
| // put payload |
| while (storageBuffer.hasRemaining()) { |
| tmpBuffer.put(storageBuffer.getShort(), storageBuffer.get()); |
| } |
| tmpBuffer.rewind(); |
| storageBuffer = tmpBuffer; |
| initPosition = 0; |
| } |
| |
| private short addNibbleRegister(short bucket, byte positionOf1) |
| { |
| short numNonZeroRegs = getNumNonZeroRegisters(); |
| |
| final int position = getPayloadBytePosition() + (short) (bucket >> 1); |
| final boolean isUpperNibble = ((bucket & 0x1) == 0); |
| |
| final byte shiftedPositionOf1 = (isUpperNibble) ? (byte) (positionOf1 << BITS_PER_BUCKET) : positionOf1; |
| |
| if (storageBuffer.remaining() != getNumBytesForDenseStorage()) { |
| convertToDenseStorage(); |
| } |
| |
| final byte origVal = storageBuffer.get(position); |
| final byte newValueMask = (isUpperNibble) ? (byte) 0xf0 : (byte) 0x0f; |
| final byte originalValueMask = (byte) (newValueMask ^ 0xff); |
| |
| // if something was at zero, we have to increase the numNonZeroRegisters |
| if ((origVal & newValueMask) == 0 && shiftedPositionOf1 != 0) { |
| numNonZeroRegs++; |
| } |
| |
| storageBuffer.put( |
| position, |
| (byte) (UnsignedBytes.max((byte) (origVal & newValueMask), shiftedPositionOf1) | (origVal & originalValueMask)) |
| ); |
| |
| return numNonZeroRegs; |
| } |
| |
| /** |
| * Returns the number of registers that are no longer zero after the value was added |
| * |
| * @param position The position into the byte buffer, this position represents two "registers" |
| * @param offsetDiff The difference in offset between the byteToAdd and the current HyperLogLogCollector |
| * @param byteToAdd The byte to merge into the current HyperLogLogCollector |
| */ |
| private static short mergeAndStoreByteRegister( |
| final ByteBuffer storageBuffer, |
| final int position, |
| final int offsetDiff, |
| final byte byteToAdd |
| ) |
| { |
| if (byteToAdd == 0) { |
| return 0; |
| } |
| |
| final byte currVal = storageBuffer.get(position); |
| |
| final int upperNibble = currVal & 0xf0; |
| final int lowerNibble = currVal & 0x0f; |
| |
| // subtract the differences so that the nibbles align |
| final int otherUpper = (byteToAdd & 0xf0) - (offsetDiff << BITS_PER_BUCKET); |
| final int otherLower = (byteToAdd & 0x0f) - offsetDiff; |
| |
| final int newUpper = Math.max(upperNibble, otherUpper); |
| final int newLower = Math.max(lowerNibble, otherLower); |
| |
| storageBuffer.put(position, (byte) ((newUpper | newLower) & 0xff)); |
| |
| short numNoLongerZero = 0; |
| if (upperNibble == 0 && newUpper > 0) { |
| ++numNoLongerZero; |
| } |
| if (lowerNibble == 0 && newLower > 0) { |
| ++numNoLongerZero; |
| } |
| |
| return numNoLongerZero; |
| } |
| |
| @Override |
| public int compareTo(HyperLogLogCollector other) |
| { |
| return Double.compare(this.estimateCardinality(), other.estimateCardinality()); |
| } |
| } |