blob: 1426368f1a69e4e2a33f2a2ee142a30d223c22f5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.datasketches.theta;
import static org.apache.datasketches.common.Family.idToFamily;
import static org.apache.datasketches.theta.PreambleUtil.COMPACT_FLAG_MASK;
import static org.apache.datasketches.theta.PreambleUtil.EMPTY_FLAG_MASK;
import static org.apache.datasketches.theta.PreambleUtil.ORDERED_FLAG_MASK;
import static org.apache.datasketches.theta.PreambleUtil.READ_ONLY_FLAG_MASK;
import static org.apache.datasketches.theta.PreambleUtil.extractFamilyID;
import static org.apache.datasketches.theta.PreambleUtil.extractFlags;
import static org.apache.datasketches.theta.PreambleUtil.extractPreLongs;
import static org.apache.datasketches.theta.PreambleUtil.extractSeedHash;
import static org.apache.datasketches.theta.PreambleUtil.extractSerVer;
import static org.apache.datasketches.theta.PreambleUtil.extractEntryBitsV4;
import static org.apache.datasketches.theta.PreambleUtil.extractNumEntriesBytesV4;
import static org.apache.datasketches.theta.PreambleUtil.extractThetaLongV4;
import static org.apache.datasketches.theta.SingleItemSketch.otherCheckForSingleItem;
import org.apache.datasketches.common.Family;
import org.apache.datasketches.common.SketchesArgumentException;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.thetacommon.ThetaUtil;
/**
* The parent class of all the CompactSketches. CompactSketches are never created directly.
* They are created as a result of the compact() method of an UpdateSketch, a result of a
* getResult() of a SetOperation, or from a heapify method.
*
* <p>A CompactSketch is the simplest form of a Theta Sketch. It consists of a compact list
* (i.e., no intervening spaces) of hash values, which may be ordered or not, a value for theta
* and a seed hash. A CompactSketch is immutable (read-only),
* and the space required when stored is only the space required for the hash values and 8 to 24
* bytes of preamble. An empty CompactSketch consumes only 8 bytes.</p>
*
* @author Lee Rhodes
*/
public abstract class CompactSketch extends Sketch {
/**
* Heapify takes a CompactSketch image in Memory and instantiates an on-heap CompactSketch.
*
* <p>The resulting sketch will not retain any link to the source Memory and all of its data will be
* copied to the heap CompactSketch.</p>
*
* <p>This method assumes that the sketch image was created with the correct hash seed, so it is not checked.
* The resulting on-heap CompactSketch will be given the seedHash derived from the given sketch image.
* However, Serial Version 1 sketch images do not have a seedHash field,
* so the resulting heapified CompactSketch will be given the hash of the DEFAULT_UPDATE_SEED.</p>
*
* @param srcMem an image of a CompactSketch.
* <a href="{@docRoot}/resources/dictionary.html#mem">See Memory</a>.
* @return a CompactSketch on the heap.
*/
public static CompactSketch heapify(final Memory srcMem) {
return heapify(srcMem, ThetaUtil.DEFAULT_UPDATE_SEED, false);
}
/**
* Heapify takes a CompactSketch image in Memory and instantiates an on-heap CompactSketch.
*
* <p>The resulting sketch will not retain any link to the source Memory and all of its data will be
* copied to the heap CompactSketch.</p>
*
* <p>This method checks if the given expectedSeed was used to create the source Memory image.
* However, SerialVersion 1 sketch images cannot be checked as they don't have a seedHash field,
* so the resulting heapified CompactSketch will be given the hash of the expectedSeed.</p>
*
* @param srcMem an image of a CompactSketch that was created using the given expectedSeed.
* <a href="{@docRoot}/resources/dictionary.html#mem">See Memory</a>.
* @param expectedSeed the seed used to validate the given Memory image.
* <a href="{@docRoot}/resources/dictionary.html#seed">See Update Hash Seed</a>.
* @return a CompactSketch on the heap.
*/
public static CompactSketch heapify(final Memory srcMem, final long expectedSeed) {
return heapify(srcMem, expectedSeed, true);
}
private static CompactSketch heapify(final Memory srcMem, final long seed, final boolean enforceSeed) {
final int serVer = extractSerVer(srcMem);
final int familyID = extractFamilyID(srcMem);
final Family family = idToFamily(familyID);
if (family != Family.COMPACT) {
throw new IllegalArgumentException("Corrupted: " + family + " is not Compact!");
}
if (serVer == 4) {
return heapifyV4(srcMem, seed, enforceSeed);
}
if (serVer == 3) {
final int flags = extractFlags(srcMem);
final boolean srcOrdered = (flags & ORDERED_FLAG_MASK) != 0;
final boolean empty = (flags & EMPTY_FLAG_MASK) != 0;
if (enforceSeed && !empty) { PreambleUtil.checkMemorySeedHash(srcMem, seed); }
return CompactOperations.memoryToCompact(srcMem, srcOrdered, null);
}
//not SerVer 3, assume compact stored form
final short seedHash = ThetaUtil.computeSeedHash(seed);
if (serVer == 1) {
return ForwardCompatibility.heapify1to3(srcMem, seedHash);
}
if (serVer == 2) {
return ForwardCompatibility.heapify2to3(srcMem,
enforceSeed ? seedHash : (short) extractSeedHash(srcMem));
}
throw new SketchesArgumentException("Unknown Serialization Version: " + serVer);
}
/**
* Wrap takes the CompactSketch image in given Memory and refers to it directly.
* There is no data copying onto the java heap.
* The wrap operation enables fast read-only merging and access to all the public read-only API.
*
* <p>Only "Direct" Serialization Version 3 (i.e, OpenSource) sketches that have
* been explicitly stored as direct sketches can be wrapped.
* Wrapping earlier serial version sketches will result in a heapify operation.
* These early versions were never designed to "wrap".</p>
*
* <p>Wrapping any subclass of this class that is empty or contains only a single item will
* result in heapified forms of empty and single item sketch respectively.
* This is actually faster and consumes less overall memory.</p>
*
* <p>This method assumes that the sketch image was created with the correct hash seed, so it is not checked.
* However, Serial Version 1 sketch images do not have a seedHash field,
* so the resulting on-heap CompactSketch will be given the hash of the DEFAULT_UPDATE_SEED.</p>
*
* @param srcMem an image of a Sketch.
* <a href="{@docRoot}/resources/dictionary.html#mem">See Memory</a>.
* @return a CompactSketch backed by the given Memory except as above.
*/
public static CompactSketch wrap(final Memory srcMem) {
return wrap(srcMem, ThetaUtil.DEFAULT_UPDATE_SEED, false);
}
/**
* Wrap takes the sketch image in the given Memory and refers to it directly.
* There is no data copying onto the java heap.
* The wrap operation enables fast read-only merging and access to all the public read-only API.
*
* <p>Only "Direct" Serialization Version 3 (i.e, OpenSource) sketches that have
* been explicitly stored as direct sketches can be wrapped.
* Wrapping earlier serial version sketches will result in a heapify operation.
* These early versions were never designed to "wrap".</p>
*
* <p>Wrapping any subclass of this class that is empty or contains only a single item will
* result in heapified forms of empty and single item sketch respectively.
* This is actually faster and consumes less overall memory.</p>
*
* <p>This method checks if the given expectedSeed was used to create the source Memory image.
* However, SerialVersion 1 sketches cannot be checked as they don't have a seedHash field,
* so the resulting heapified CompactSketch will be given the hash of the expectedSeed.</p>
*
* @param srcMem an image of a Sketch that was created using the given expectedSeed.
* <a href="{@docRoot}/resources/dictionary.html#mem">See Memory</a>
* @param expectedSeed the seed used to validate the given Memory image.
* <a href="{@docRoot}/resources/dictionary.html#seed">See Update Hash Seed</a>.
* @return a CompactSketch backed by the given Memory except as above.
*/
public static CompactSketch wrap(final Memory srcMem, final long expectedSeed) {
return wrap(srcMem, expectedSeed, true);
}
private static CompactSketch wrap(final Memory srcMem, final long seed, final boolean enforceSeed) {
final int serVer = extractSerVer(srcMem);
final int familyID = extractFamilyID(srcMem);
final Family family = Family.idToFamily(familyID);
if (family != Family.COMPACT) {
throw new IllegalArgumentException("Corrupted: " + family + " is not Compact!");
}
final short seedHash = ThetaUtil.computeSeedHash(seed);
if (serVer == 4) {
// not wrapping the compressed format since currently we cannot take advantage of
// decompression during iteration because set operations reach into memory directly
return heapifyV4(srcMem, seed, enforceSeed);
}
else if (serVer == 3) {
if (PreambleUtil.isEmptyFlag(srcMem)) {
return EmptyCompactSketch.getHeapInstance(srcMem);
}
if (otherCheckForSingleItem(srcMem)) {
return SingleItemSketch.heapify(srcMem, enforceSeed ? seedHash : (short) extractSeedHash(srcMem));
}
//not empty & not singleItem
final int flags = extractFlags(srcMem);
final boolean compactFlag = (flags & COMPACT_FLAG_MASK) > 0;
if (!compactFlag) {
throw new SketchesArgumentException(
"Corrupted: COMPACT family sketch image must have compact flag set");
}
final boolean readOnly = (flags & READ_ONLY_FLAG_MASK) > 0;
if (!readOnly) {
throw new SketchesArgumentException(
"Corrupted: COMPACT family sketch image must have Read-Only flag set");
}
return DirectCompactSketch.wrapInstance(srcMem,
enforceSeed ? seedHash : (short) extractSeedHash(srcMem));
} //end of serVer 3
else if (serVer == 1) {
return ForwardCompatibility.heapify1to3(srcMem, seedHash);
}
else if (serVer == 2) {
return ForwardCompatibility.heapify2to3(srcMem,
enforceSeed ? seedHash : (short) extractSeedHash(srcMem));
}
throw new SketchesArgumentException(
"Corrupted: Serialization Version " + serVer + " not recognized.");
}
//Sketch Overrides
@Override
public abstract CompactSketch compact(final boolean dstOrdered, final WritableMemory dstMem);
@Override
public int getCompactBytes() {
return getCurrentBytes();
}
@Override
int getCurrentDataLongs() {
return getRetainedEntries(true);
}
@Override
public Family getFamily() {
return Family.COMPACT;
}
@Override
public boolean isCompact() {
return true;
}
/**
* gets the sketch as a compressed byte array
* @return the sketch as a compressed byte array
*/
public byte[] toByteArrayCompressed() {
if (!isOrdered() || getRetainedEntries() == 0 || (getRetainedEntries() == 1 && !isEstimationMode())) {
return toByteArray();
}
return toByteArrayV4();
}
private int computeMinLeadingZeros() {
// compression is based on leading zeros in deltas between ordered hash values
// assumes ordered sketch
long previous = 0;
long ored = 0;
final HashIterator it = iterator();
while (it.next()) {
final long delta = it.get() - previous;
ored |= delta;
previous = it.get();
}
return Long.numberOfLeadingZeros(ored);
}
private static int wholeBytesToHoldBits(final int bits) {
return (bits >>> 3) + ((bits & 7) > 0 ? 1 : 0);
}
private byte[] toByteArrayV4() {
final int preambleLongs = isEstimationMode() ? 2 : 1;
final int entryBits = 64 - computeMinLeadingZeros();
final int compressedBits = entryBits * getRetainedEntries();
// store num_entries as whole bytes since whole-byte blocks will follow (most probably)
final int numEntriesBytes = wholeBytesToHoldBits(32 - Integer.numberOfLeadingZeros(getRetainedEntries()));
final int size = preambleLongs * Long.BYTES + numEntriesBytes + wholeBytesToHoldBits(compressedBits);
final byte[] bytes = new byte[size];
final WritableMemory mem = WritableMemory.writableWrap(bytes);
int offsetBytes = 0;
mem.putByte(offsetBytes++, (byte) preambleLongs);
mem.putByte(offsetBytes++, (byte) 4); // to do: add constant
mem.putByte(offsetBytes++, (byte) Family.COMPACT.getID());
mem.putByte(offsetBytes++, (byte) entryBits);
mem.putByte(offsetBytes++, (byte) numEntriesBytes);
mem.putByte(offsetBytes++, (byte) (COMPACT_FLAG_MASK | READ_ONLY_FLAG_MASK | ORDERED_FLAG_MASK));
mem.putShort(offsetBytes, getSeedHash());
offsetBytes += Short.BYTES;
if (isEstimationMode()) {
mem.putLong(offsetBytes, getThetaLong());
offsetBytes += Long.BYTES;
}
int numEntries = getRetainedEntries();
for (int i = 0; i < numEntriesBytes; i++) {
mem.putByte(offsetBytes++, (byte) (numEntries & 0xff));
numEntries >>>= 8;
}
long previous = 0;
final long[] deltas = new long[8];
final HashIterator it = iterator();
int i;
for (i = 0; i + 7 < getRetainedEntries(); i += 8) {
for (int j = 0; j < 8; j++) {
it.next();
deltas[j] = it.get() - previous;
previous = it.get();
}
BitPacking.packBitsBlock8(deltas, 0, bytes, offsetBytes, entryBits);
offsetBytes += entryBits;
}
int offsetBits = 0;
for (; i < getRetainedEntries(); i++) {
it.next();
final long delta = it.get() - previous;
previous = it.get();
BitPacking.packBits(delta, entryBits, bytes, offsetBytes, offsetBits);
offsetBytes += (offsetBits + entryBits) >>> 3;
offsetBits = (offsetBits + entryBits) & 7;
}
return bytes;
}
private static CompactSketch heapifyV4(final Memory srcMem, final long seed, final boolean enforceSeed) {
final int preLongs = extractPreLongs(srcMem);
final int flags = extractFlags(srcMem);
final int entryBits = extractEntryBitsV4(srcMem);
final int numEntriesBytes = extractNumEntriesBytesV4(srcMem);
final short seedHash = (short) extractSeedHash(srcMem);
final boolean isEmpty = (flags & EMPTY_FLAG_MASK) > 0;
if (enforceSeed && !isEmpty) { PreambleUtil.checkMemorySeedHash(srcMem, seed); }
int offsetBytes = 8;
long theta = Long.MAX_VALUE;
if (preLongs > 1) {
theta = extractThetaLongV4(srcMem);
offsetBytes += Long.BYTES;
}
int numEntries = 0;
for (int i = 0; i < numEntriesBytes; i++) {
numEntries |= Byte.toUnsignedInt(srcMem.getByte(offsetBytes++)) << (i << 3);
}
final long[] entries = new long[numEntries];
final byte[] bytes = new byte[entryBits]; // temporary buffer for unpacking
int i;
for (i = 0; i + 7 < numEntries; i += 8) {
srcMem.getByteArray(offsetBytes, bytes, 0, entryBits);
BitPacking.unpackBitsBlock8(entries, i, bytes, 0, entryBits);
offsetBytes += entryBits;
}
if (i < numEntries) {
srcMem.getByteArray(offsetBytes, bytes, 0, wholeBytesToHoldBits((numEntries - i) * entryBits));
int offsetBits = 0;
offsetBytes = 0;
for (; i < numEntries; i++) {
BitPacking.unpackBits(entries, i, entryBits, bytes, offsetBytes, offsetBits);
offsetBytes += (offsetBits + entryBits) >>> 3;
offsetBits = (offsetBits + entryBits) & 7;
}
}
// undo deltas
long previous = 0;
for (i = 0; i < numEntries; i++) {
entries[i] += previous;
previous = entries[i];
}
return new HeapCompactSketch(entries, isEmpty, seedHash, numEntries, theta, true);
}
}