blob: 882c5e2e9dc310eb628aa7589efcb56197ad1dac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.datasketches.theta;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.datasketches.common.Util.LONG_MAX_VALUE_AS_DOUBLE;
import static org.apache.datasketches.common.Util.checkBounds;
import static org.apache.datasketches.hash.MurmurHash3.hash;
import static org.apache.datasketches.theta.CompactOperations.componentsToCompact;
import static org.apache.datasketches.theta.PreambleUtil.BIG_ENDIAN_FLAG_MASK;
import static org.apache.datasketches.theta.PreambleUtil.COMPACT_FLAG_MASK;
import static org.apache.datasketches.theta.PreambleUtil.FAMILY_BYTE;
import static org.apache.datasketches.theta.PreambleUtil.ORDERED_FLAG_MASK;
import static org.apache.datasketches.theta.PreambleUtil.PREAMBLE_LONGS_BYTE;
import static org.apache.datasketches.theta.PreambleUtil.READ_ONLY_FLAG_MASK;
import static org.apache.datasketches.theta.PreambleUtil.SER_VER;
import static org.apache.datasketches.theta.PreambleUtil.SER_VER_BYTE;
import static org.apache.datasketches.theta.PreambleUtil.checkMemorySeedHash;
import static org.apache.datasketches.theta.PreambleUtil.extractFamilyID;
import static org.apache.datasketches.theta.PreambleUtil.extractFlags;
import static org.apache.datasketches.theta.PreambleUtil.extractLgResizeFactor;
import static org.apache.datasketches.theta.PreambleUtil.extractP;
import static org.apache.datasketches.theta.PreambleUtil.extractSerVer;
import static org.apache.datasketches.theta.PreambleUtil.extractThetaLong;
import static org.apache.datasketches.theta.PreambleUtil.getMemBytes;
import static org.apache.datasketches.theta.UpdateReturnState.RejectedNullOrEmpty;
import java.nio.ByteBuffer;
import java.util.Objects;
import org.apache.datasketches.common.Family;
import org.apache.datasketches.common.ResizeFactor;
import org.apache.datasketches.common.SketchesArgumentException;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.thetacommon.ThetaUtil;
/**
* The parent class for the Update Sketch families, such as QuickSelect and Alpha.
* The primary task of an Update Sketch is to consider datums presented via the update() methods
* for inclusion in its internal cache. This is the sketch building process.
*
* @author Lee Rhodes
*/
public abstract class UpdateSketch extends Sketch {
UpdateSketch() {}
/**
* Wrap takes the sketch image in Memory and refers to it directly. There is no data copying onto
* the java heap. Only "Direct" Serialization Version 3 (i.e, OpenSource) sketches that have
* been explicitly stored as direct objects can be wrapped. This method assumes the
* {@link org.apache.datasketches.thetacommon.ThetaUtil#DEFAULT_UPDATE_SEED}.
* <a href="{@docRoot}/resources/dictionary.html#defaultUpdateSeed">Default Update Seed</a>.
* @param srcMem an image of a Sketch where the image seed hash matches the default seed hash.
* It must have a size of at least 24 bytes.
* <a href="{@docRoot}/resources/dictionary.html#mem">See Memory</a>
* @return a Sketch backed by the given Memory
*/
public static UpdateSketch wrap(final WritableMemory srcMem) {
return wrap(srcMem, ThetaUtil.DEFAULT_UPDATE_SEED);
}
/**
* Wrap takes the sketch image in Memory and refers to it directly. There is no data copying onto
* the java heap. Only "Direct" Serialization Version 3 (i.e, OpenSource) sketches that have
* been explicitly stored as direct objects can be wrapped.
* An attempt to "wrap" earlier version sketches will result in a "heapified", normal
* Java Heap version of the sketch where all data will be copied to the heap.
* @param srcMem an image of a Sketch where the image seed hash matches the given seed hash.
* It must have a size of at least 24 bytes.
* <a href="{@docRoot}/resources/dictionary.html#mem">See Memory</a>
* @param expectedSeed the seed used to validate the given Memory image.
* <a href="{@docRoot}/resources/dictionary.html#seed">See Update Hash Seed</a>.
* Compact sketches store a 16-bit hash of the seed, but not the seed itself.
* @return a UpdateSketch backed by the given Memory
*/
public static UpdateSketch wrap(final WritableMemory srcMem, final long expectedSeed) {
Objects.requireNonNull(srcMem, "Source Memory must not be null");
checkBounds(0, 24, srcMem.getCapacity()); //need min 24 bytes
final int preLongs = srcMem.getByte(PREAMBLE_LONGS_BYTE) & 0X3F;
final int serVer = srcMem.getByte(SER_VER_BYTE) & 0XFF;
final int familyID = srcMem.getByte(FAMILY_BYTE) & 0XFF;
final Family family = Family.idToFamily(familyID);
if (family != Family.QUICKSELECT) {
throw new SketchesArgumentException(
"A " + family + " sketch cannot be wrapped as an UpdateSketch.");
}
if ((serVer == 3) && (preLongs == 3)) {
return DirectQuickSelectSketch.writableWrap(srcMem, expectedSeed);
} else {
throw new SketchesArgumentException(
"Corrupted: An UpdateSketch image must have SerVer = 3 and preLongs = 3");
}
}
/**
* Instantiates an on-heap UpdateSketch from Memory. This method assumes the
* {@link org.apache.datasketches.thetacommon.ThetaUtil#DEFAULT_UPDATE_SEED}.
* @param srcMem <a href="{@docRoot}/resources/dictionary.html#mem">See Memory</a>
* It must have a size of at least 24 bytes.
* @return an UpdateSketch
*/
public static UpdateSketch heapify(final Memory srcMem) {
return heapify(srcMem, ThetaUtil.DEFAULT_UPDATE_SEED);
}
/**
* Instantiates an on-heap UpdateSketch from Memory.
* @param srcMem <a href="{@docRoot}/resources/dictionary.html#mem">See Memory</a>
* It must have a size of at least 24 bytes.
* @param expectedSeed the seed used to validate the given Memory image.
* <a href="{@docRoot}/resources/dictionary.html#seed">See Update Hash Seed</a>.
* @return an UpdateSketch
*/
public static UpdateSketch heapify(final Memory srcMem, final long expectedSeed) {
Objects.requireNonNull(srcMem, "Source Memory must not be null");
checkBounds(0, 24, srcMem.getCapacity()); //need min 24 bytes
final Family family = Family.idToFamily(srcMem.getByte(FAMILY_BYTE));
if (family.equals(Family.ALPHA)) {
return HeapAlphaSketch.heapifyInstance(srcMem, expectedSeed);
}
return HeapQuickSelectSketch.heapifyInstance(srcMem, expectedSeed);
}
//Sketch interface
@Override
public CompactSketch compact(final boolean dstOrdered, final WritableMemory dstMem) {
return componentsToCompact(getThetaLong(), getRetainedEntries(true), getSeedHash(), isEmpty(),
false, false, dstOrdered, dstMem, getCache());
}
@Override
public int getCompactBytes() {
final int preLongs = getCompactPreambleLongs();
final int dataLongs = getRetainedEntries(true);
return (preLongs + dataLongs) << 3;
}
@Override
int getCurrentDataLongs() {
return 1 << getLgArrLongs();
}
@Override
public boolean isCompact() {
return false;
}
@Override
public boolean isOrdered() {
return false;
}
//UpdateSketch interface
/**
* Returns a new builder
* @return a new builder
*/
public static final UpdateSketchBuilder builder() {
return new UpdateSketchBuilder();
}
/**
* Returns the configured ResizeFactor
* @return the configured ResizeFactor
*/
public abstract ResizeFactor getResizeFactor();
/**
* Gets the configured sampling probability, <i>p</i>.
* <a href="{@docRoot}/resources/dictionary.html#p">See Sampling Probability, <i>p</i></a>
* @return the sampling probability, <i>p</i>
*/
abstract float getP();
/**
* Gets the configured seed
* @return the configured seed
*/
abstract long getSeed();
/**
* Resets this sketch back to a virgin empty state.
*/
public abstract void reset();
/**
* Rebuilds the hash table to remove dirty values or to reduce the size
* to nominal entries.
* @return this sketch
*/
public abstract UpdateSketch rebuild();
/**
* Present this sketch with a long.
*
* @param datum The given long datum.
* @return
* <a href="{@docRoot}/resources/dictionary.html#updateReturnState">See Update Return State</a>
*/
public UpdateReturnState update(final long datum) {
final long[] data = { datum };
return hashUpdate(hash(data, getSeed())[0] >>> 1);
}
/**
* Present this sketch with the given double (or float) datum.
* The double will be converted to a long using Double.doubleToLongBits(datum),
* which normalizes all NaN values to a single NaN representation.
* Plus and minus zero will be normalized to plus zero.
* The special floating-point values NaN and +/- Infinity are treated as distinct.
*
* @param datum The given double datum.
* @return
* <a href="{@docRoot}/resources/dictionary.html#updateReturnState">See Update Return State</a>
*/
public UpdateReturnState update(final double datum) {
final double d = (datum == 0.0) ? 0.0 : datum; // canonicalize -0.0, 0.0
final long[] data = { Double.doubleToLongBits(d) };// canonicalize all NaN & +/- infinity forms
return hashUpdate(hash(data, getSeed())[0] >>> 1);
}
/**
* Present this sketch with the given String.
* The string is converted to a byte array using UTF8 encoding.
* If the string is null or empty no update attempt is made and the method returns.
*
* <p>Note: this will not produce the same output hash values as the {@link #update(char[])}
* method and will generally be a little slower depending on the complexity of the UTF8 encoding.
* </p>
*
* @param datum The given String.
* @return
* <a href="{@docRoot}/resources/dictionary.html#updateReturnState">See Update Return State</a>
*/
public UpdateReturnState update(final String datum) {
if ((datum == null) || datum.isEmpty()) {
return RejectedNullOrEmpty;
}
final byte[] data = datum.getBytes(UTF_8);
return hashUpdate(hash(data, getSeed())[0] >>> 1);
}
/**
* Present this sketch with the given byte array.
* If the byte array is null or empty no update attempt is made and the method returns.
*
* @param data The given byte array.
* @return
* <a href="{@docRoot}/resources/dictionary.html#updateReturnState">See Update Return State</a>
*/
public UpdateReturnState update(final byte[] data) {
if ((data == null) || (data.length == 0)) {
return RejectedNullOrEmpty;
}
return hashUpdate(hash(data, getSeed())[0] >>> 1);
}
/**
* Present this sketch with the given ByteBuffer
* If the ByteBuffer is null or empty, no update attempt is made and the method returns.
*
* @param buffer the input ByteBuffer
* @return
* <a href="{@docRoot}/resources/dictionary.html#updateReturnState">See Update Return State</a>
*/
public UpdateReturnState update(final ByteBuffer buffer) {
if (buffer == null || buffer.hasRemaining() == false) {
return RejectedNullOrEmpty;
}
return hashUpdate(hash(buffer, getSeed())[0] >>> 1);
}
/**
* Present this sketch with the given char array.
* If the char array is null or empty no update attempt is made and the method returns.
*
* <p>Note: this will not produce the same output hash values as the {@link #update(String)}
* method but will be a little faster as it avoids the complexity of the UTF8 encoding.</p>
*
* @param data The given char array.
* @return
* <a href="{@docRoot}/resources/dictionary.html#updateReturnState">See Update Return State</a>
*/
public UpdateReturnState update(final char[] data) {
if ((data == null) || (data.length == 0)) {
return RejectedNullOrEmpty;
}
return hashUpdate(hash(data, getSeed())[0] >>> 1);
}
/**
* Present this sketch with the given integer array.
* If the integer array is null or empty no update attempt is made and the method returns.
*
* @param data The given int array.
* @return
* <a href="{@docRoot}/resources/dictionary.html#updateReturnState">See Update Return State</a>
*/
public UpdateReturnState update(final int[] data) {
if ((data == null) || (data.length == 0)) {
return RejectedNullOrEmpty;
}
return hashUpdate(hash(data, getSeed())[0] >>> 1);
}
/**
* Present this sketch with the given long array.
* If the long array is null or empty no update attempt is made and the method returns.
*
* @param data The given long array.
* @return
* <a href="{@docRoot}/resources/dictionary.html#updateReturnState">See Update Return State</a>
*/
public UpdateReturnState update(final long[] data) {
if ((data == null) || (data.length == 0)) {
return RejectedNullOrEmpty;
}
return hashUpdate(hash(data, getSeed())[0] >>> 1);
}
//restricted methods
/**
* All potential updates converge here.
* <p>Don't ever call this unless you really know what you are doing!</p>
*
* @param hash the given input hash value. A hash of zero or Long.MAX_VALUE is ignored.
* A negative hash value will throw an exception.
* @return <a href="{@docRoot}/resources/dictionary.html#updateReturnState">See Update Return State</a>
*/
abstract UpdateReturnState hashUpdate(long hash);
/**
* Gets the Log base 2 of the current size of the internal cache
* @return the Log base 2 of the current size of the internal cache
*/
abstract int getLgArrLongs();
/**
* Gets the Log base 2 of the configured nominal entries
* @return the Log base 2 of the configured nominal entries
*/
public abstract int getLgNomLongs();
/**
* Returns true if the internal cache contains "dirty" values that are greater than or equal
* to thetaLong.
* @return true if the internal cache is dirty.
*/
abstract boolean isDirty();
/**
* Returns true if numEntries (curCount) is greater than the hashTableThreshold.
* @param numEntries the given number of entries (or current count).
* @return true if numEntries (curCount) is greater than the hashTableThreshold.
*/
abstract boolean isOutOfSpace(int numEntries);
static void checkUnionQuickSelectFamily(final Memory mem, final int preambleLongs,
final int lgNomLongs) {
//Check Family
final int familyID = extractFamilyID(mem); //byte 2
final Family family = Family.idToFamily(familyID);
if (family.equals(Family.UNION)) {
if (preambleLongs != Family.UNION.getMinPreLongs()) {
throw new SketchesArgumentException(
"Possible corruption: Invalid PreambleLongs value for UNION: " + preambleLongs);
}
}
else if (family.equals(Family.QUICKSELECT)) {
if (preambleLongs != Family.QUICKSELECT.getMinPreLongs()) {
throw new SketchesArgumentException(
"Possible corruption: Invalid PreambleLongs value for QUICKSELECT: " + preambleLongs);
}
} else {
throw new SketchesArgumentException(
"Possible corruption: Invalid Family: " + family.toString());
}
//Check lgNomLongs
if (lgNomLongs < ThetaUtil.MIN_LG_NOM_LONGS) {
throw new SketchesArgumentException(
"Possible corruption: Current Memory lgNomLongs < min required size: "
+ lgNomLongs + " < " + ThetaUtil.MIN_LG_NOM_LONGS);
}
}
static void checkMemIntegrity(final Memory srcMem, final long expectedSeed, final int preambleLongs,
final int lgNomLongs, final int lgArrLongs) {
//Check SerVer
final int serVer = extractSerVer(srcMem); //byte 1
if (serVer != SER_VER) {
throw new SketchesArgumentException(
"Possible corruption: Invalid Serialization Version: " + serVer);
}
//Check flags
final int flags = extractFlags(srcMem); //byte 5
final int flagsMask =
ORDERED_FLAG_MASK | COMPACT_FLAG_MASK | READ_ONLY_FLAG_MASK | BIG_ENDIAN_FLAG_MASK;
if ((flags & flagsMask) > 0) {
throw new SketchesArgumentException(
"Possible corruption: Input srcMem cannot be: big-endian, compact, ordered, or read-only");
}
//Check seed hashes
final short seedHash = checkMemorySeedHash(srcMem, expectedSeed); //byte 6,7
ThetaUtil.checkSeedHashes(seedHash, ThetaUtil.computeSeedHash(expectedSeed));
//Check mem capacity, lgArrLongs
final long curCapBytes = srcMem.getCapacity();
final int minReqBytes = getMemBytes(lgArrLongs, preambleLongs);
if (curCapBytes < minReqBytes) {
throw new SketchesArgumentException(
"Possible corruption: Current Memory size < min required size: "
+ curCapBytes + " < " + minReqBytes);
}
//check Theta, p
final float p = extractP(srcMem); //bytes 12-15
final long thetaLong = extractThetaLong(srcMem); //bytes 16-23
final double theta = thetaLong / LONG_MAX_VALUE_AS_DOUBLE;
//if (lgArrLongs <= lgNomLongs) the sketch is still resizing, thus theta cannot be < p.
if ((lgArrLongs <= lgNomLongs) && (theta < p) ) {
throw new SketchesArgumentException(
"Possible corruption: Theta cannot be < p and lgArrLongs <= lgNomLongs. "
+ lgArrLongs + " <= " + lgNomLongs + ", Theta: " + theta + ", p: " + p);
}
}
/**
* This checks to see if the memory RF factor was set correctly as early versions may not
* have set it.
* @param srcMem the source memory
* @param lgNomLongs the current lgNomLongs
* @param lgArrLongs the current lgArrLongs
* @return true if the the memory RF factor is incorrect and the caller can either
* correct it or throw an error.
*/
static boolean isResizeFactorIncorrect(final Memory srcMem, final int lgNomLongs,
final int lgArrLongs) {
final int lgT = lgNomLongs + 1;
final int lgA = lgArrLongs;
final int lgR = extractLgResizeFactor(srcMem);
if (lgR == 0) { return lgA != lgT; }
return !(((lgT - lgA) % lgR) == 0);
}
}