blob: dcc773996a287cbf8556cdc012c0d504a8d12a17 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.datasketches.theta;
import static java.lang.Math.min;
import static java.lang.foreign.ValueLayout.JAVA_LONG_UNALIGNED;
import static org.apache.datasketches.common.QuickSelect.selectExcludingZeros;
import static org.apache.datasketches.theta.PreambleUtil.COMPACT_FLAG_MASK;
import static org.apache.datasketches.theta.PreambleUtil.ORDERED_FLAG_MASK;
import static org.apache.datasketches.theta.PreambleUtil.UNION_THETA_LONG;
import static org.apache.datasketches.theta.PreambleUtil.clearEmpty;
import static org.apache.datasketches.theta.PreambleUtil.extractFamilyID;
import static org.apache.datasketches.theta.PreambleUtil.extractUnionThetaLong;
import static org.apache.datasketches.theta.PreambleUtil.insertUnionThetaLong;
import java.lang.foreign.MemorySegment;
import java.nio.ByteBuffer;
import java.util.Objects;
import org.apache.datasketches.common.Family;
import org.apache.datasketches.common.ResizeFactor;
import org.apache.datasketches.common.SketchesArgumentException;
import org.apache.datasketches.common.Util;
import org.apache.datasketches.thetacommon.HashOperations;
/**
* Shared code for the HeapUnion and DirectUnion implementations.
*
* @author Lee Rhodes
* @author Kevin Lang
*/
final class UnionImpl extends Union {
/**
* Although the gadget object is initially an UpdateSketch, in the context of a Union it is used
* as a specialized buffer that happens to leverage much of the machinery of an UpdateSketch.
* However, in this context some of the key invariants of the sketch algorithm are intentionally
* violated as an optimization. As a result this object can not be considered as an UpdateSketch
* and should never be exported as an UpdateSketch. It's internal state is not necessarily
* finalized and may contain garbage. Also its internal concept of "nominal entries" or "k" can
* be meaningless. It is private for very good reasons.
*/
private final UpdateSketch gadget_;
private final short expectedSeedHash_; //eliminates having to compute the seedHash on every union.
private long unionThetaLong_; //when on-heap, this is the only copy
private boolean unionEmpty_; //when on-heap, this is the only copy
private UnionImpl(final UpdateSketch gadget, final long seed) {
gadget_ = gadget;
expectedSeedHash_ = Util.computeSeedHash(seed);
}
/**
* Construct a new Union SetOperation on the java heap.
* Called by SetOperationBuilder.
*
* @param lgNomLongs <a href="{@docRoot}/resources/dictionary.html#lgNomLogs">See lgNomLongs</a>
* @param seed <a href="{@docRoot}/resources/dictionary.html#seed">See seed</a>
* @param p <a href="{@docRoot}/resources/dictionary.html#p">See Sampling Probability, <i>p</i></a>
* @param rf <a href="{@docRoot}/resources/dictionary.html#resizeFactor">See Resize Factor</a>
* @return instance of this sketch
*/
static UnionImpl initNewHeapInstance(
final int lgNomLongs,
final long seed,
final float p,
final ResizeFactor rf) {
final UpdateSketch gadget = //create with UNION family
new HeapQuickSelectSketch(lgNomLongs, seed, p, rf, true);
final UnionImpl unionImpl = new UnionImpl(gadget, seed);
unionImpl.unionThetaLong_ = gadget.getThetaLong();
unionImpl.unionEmpty_ = gadget.isEmpty();
return unionImpl;
}
/**
* Construct a new Direct Union in the destination MemorySegment.
* Called by SetOperationBuilder.
*
* @param lgNomLongs <a href="{@docRoot}/resources/dictionary.html#lgNomLogs">See lgNomLongs</a>.
* @param seed <a href="{@docRoot}/resources/dictionary.html#seed">See seed</a>
* @param p <a href="{@docRoot}/resources/dictionary.html#p">See Sampling Probability, <i>p</i></a>
* @param rf <a href="{@docRoot}/resources/dictionary.html#resizeFactor">See Resize Factor</a>
* @param dstSeg the given MemorySegment object destination. It will be cleared prior to use.
* @return this class
*/
static UnionImpl initNewDirectInstance(
final int lgNomLongs,
final long seed,
final float p,
final ResizeFactor rf,
final MemorySegment dstSeg) {
final UpdateSketch gadget = //create with UNION family
new DirectQuickSelectSketch(lgNomLongs, seed, p, rf, dstSeg, null, true);
final UnionImpl unionImpl = new UnionImpl(gadget, seed);
unionImpl.unionThetaLong_ = gadget.getThetaLong();
unionImpl.unionEmpty_ = gadget.isEmpty();
return unionImpl;
}
/**
* Heapify a Union from a MemorySegment Union object containing data.
* Called by SetOperation.
* @param srcSeg The source MemorySegment Union object.
* @param expectedSeed the seed used to validate the given MemorySegment image.
* <a href="{@docRoot}/resources/dictionary.html#seed">See seed</a>
* @return this class
*/
static UnionImpl heapifyInstance(final MemorySegment srcSeg, final long expectedSeed) {
final MemorySegment srcSegRO = srcSeg.asReadOnly();
Family.UNION.checkFamilyID(extractFamilyID(srcSegRO));
final UpdateSketch gadget = HeapQuickSelectSketch.heapifyInstance(srcSegRO, expectedSeed);
final UnionImpl unionImpl = new UnionImpl(gadget, expectedSeed);
unionImpl.unionThetaLong_ = extractUnionThetaLong(srcSegRO);
unionImpl.unionEmpty_ = PreambleUtil.isEmptyFlag(srcSegRO);
return unionImpl;
}
/**
* Fast-wrap a Union object around a Union MemorySegment object containing data.
* This does NO validity checking of the given MemorySegment.
* @param srcSeg The source MemorySegment object.
* @param expectedSeed the seed used to validate the given MemorySegment image.
* <a href="{@docRoot}/resources/dictionary.html#seed">See seed</a>
* @return this class
*/
static UnionImpl fastWrapInstance(final MemorySegment srcSeg, final long expectedSeed) {
Family.UNION.checkFamilyID(extractFamilyID(srcSeg));
final UpdateSketch gadget = srcSeg.isReadOnly()
? DirectQuickSelectSketchR.fastReadOnlyWrap(srcSeg, expectedSeed)
: DirectQuickSelectSketch.fastWritableWrap(srcSeg, null, expectedSeed);
final UnionImpl unionImpl = new UnionImpl(gadget, expectedSeed);
unionImpl.unionThetaLong_ = extractUnionThetaLong(srcSeg);
unionImpl.unionEmpty_ = PreambleUtil.isEmptyFlag(srcSeg);
return unionImpl;
}
/**
* Wrap a Union object around a Union MemorySegment object containing data.
* @param srcSeg The source MemorySegment object.
* @param expectedSeed the seed used to validate the given MemorySegment image.
* <a href="{@docRoot}/resources/dictionary.html#seed">See seed</a>
* @return this class
*/
//Called by SetOperation and Union
static UnionImpl wrapInstance(final MemorySegment srcSeg, final long expectedSeed) {
Family.UNION.checkFamilyID(extractFamilyID(srcSeg));
final UpdateSketch gadget = srcSeg.isReadOnly()
? DirectQuickSelectSketchR.readOnlyWrap(srcSeg, expectedSeed)
: DirectQuickSelectSketch.writableWrap(srcSeg, null, expectedSeed);
final UnionImpl unionImpl = new UnionImpl(gadget, expectedSeed);
unionImpl.unionThetaLong_ = extractUnionThetaLong(srcSeg);
unionImpl.unionEmpty_ = PreambleUtil.isEmptyFlag(srcSeg);
return unionImpl;
}
@Override
public int getCurrentBytes() {
return gadget_.getCurrentBytes();
}
@Override
public int getMaxUnionBytes() {
final int lgK = gadget_.getLgNomLongs();
return (16 << lgK) + (Family.UNION.getMaxPreLongs() << 3);
}
@Override
MemorySegment getMemorySegment() {
return hasMemorySegment() ? gadget_.getMemorySegment() : null;
}
@Override
public CompactSketch getResult() {
return getResult(true, null);
}
@Override
public CompactSketch getResult(final boolean dstOrdered, final MemorySegment dstSeg) {
final int gadgetCurCount = gadget_.getRetainedEntries(true);
final int k = 1 << gadget_.getLgNomLongs();
final long[] gadgetCacheCopy =
gadget_.hasMemorySegment() ? gadget_.getCache() : gadget_.getCache().clone();
//Pull back to k
final long curGadgetThetaLong = gadget_.getThetaLong();
final long adjGadgetThetaLong = gadgetCurCount > k
? selectExcludingZeros(gadgetCacheCopy, gadgetCurCount, k + 1) : curGadgetThetaLong;
//Finalize Theta and curCount
final long unionThetaLong = gadget_.hasMemorySegment()
? gadget_.getMemorySegment().get(JAVA_LONG_UNALIGNED, UNION_THETA_LONG)
: unionThetaLong_;
final long minThetaLong = min(min(curGadgetThetaLong, adjGadgetThetaLong), unionThetaLong);
final int curCountOut = minThetaLong < curGadgetThetaLong
? HashOperations.count(gadgetCacheCopy, minThetaLong)
: gadgetCurCount;
//Compact the cache
final long[] compactCacheOut =
CompactOperations.compactCache(gadgetCacheCopy, curCountOut, minThetaLong, dstOrdered);
final boolean empty = gadget_.isEmpty() && unionEmpty_;
final short seedHash = gadget_.getSeedHash();
return CompactOperations.componentsToCompact(
minThetaLong, curCountOut, seedHash, empty, true, dstOrdered, dstOrdered, dstSeg, compactCacheOut);
}
@Override
public boolean hasMemorySegment() {
return gadget_.hasMemorySegment();
}
@Override
public boolean isOffHeap() {
return gadget_.isOffHeap();
}
@Override
public boolean isSameResource(final MemorySegment that) {
return gadget_.isSameResource(that);
}
@Override
public void reset() {
gadget_.reset();
unionThetaLong_ = gadget_.getThetaLong();
unionEmpty_ = gadget_.isEmpty();
}
@Override
public byte[] toByteArray() {
final byte[] gadgetByteArr = gadget_.toByteArray();
final MemorySegment seg = MemorySegment.ofArray(gadgetByteArr);
insertUnionThetaLong(seg, unionThetaLong_);
if (gadget_.isEmpty() != unionEmpty_) {
clearEmpty(seg);
unionEmpty_ = false;
}
return gadgetByteArr;
}
@Override //Stateless Union
public CompactSketch union(final ThetaSketch sketchA, final ThetaSketch sketchB, final boolean dstOrdered,
final MemorySegment dstSeg) {
reset();
union(sketchA);
union(sketchB);
final CompactSketch csk = getResult(dstOrdered, dstSeg);
reset();
return csk;
}
@Override
public void union(final ThetaSketch sketchIn) {
//UNION Empty Rule: AND the empty states.
if (sketchIn == null || sketchIn.isEmpty()) {
//null and empty is interpreted as (Theta = 1.0, count = 0, empty = T). Nothing changes
return;
}
//sketchIn is valid and not empty
Util.checkSeedHashes(expectedSeedHash_, sketchIn.getSeedHash());
if (sketchIn instanceof SingleItemSketch) {
gadget_.hashUpdate(sketchIn.getCache()[0]);
return;
}
UnionImpl.checkSketchAndMemorySegmentFlags(sketchIn);
unionThetaLong_ = min(min(unionThetaLong_, sketchIn.getThetaLong()), gadget_.getThetaLong()); //Theta rule
unionEmpty_ = false;
final boolean isOrdered = sketchIn.isOrdered();
final HashIterator it = sketchIn.iterator();
while (it.next()) {
final long hash = it.get();
if (hash < unionThetaLong_ && hash < gadget_.getThetaLong()) {
gadget_.hashUpdate(hash); // backdoor update, hash function is bypassed
} else if (isOrdered) { break; }
}
unionThetaLong_ = min(unionThetaLong_, gadget_.getThetaLong()); //Theta rule with gadget
if (gadget_.hasMemorySegment()) {
final MemorySegment wseg = gadget_.getMemorySegment();
PreambleUtil.insertUnionThetaLong(wseg, unionThetaLong_);
PreambleUtil.clearEmpty(wseg);
}
}
@Override
public void union(final MemorySegment seg) {
Objects.requireNonNull(seg, "MemorySegment must be non-null");
union(ThetaSketch.wrap(seg.asReadOnly()));
}
@Override
public void update(final long datum) {
gadget_.update(datum);
}
@Override
public void update(final double datum) {
gadget_.update(datum);
}
@Override
public void update(final String datum) {
gadget_.update(datum);
}
@Override
public void update(final byte[] data) {
gadget_.update(data);
}
@Override
public void update(final ByteBuffer data) {
gadget_.update(data);
}
@Override
public void update(final char[] data) {
gadget_.update(data);
}
@Override
public void update(final int[] data) {
gadget_.update(data);
}
@Override
public void update(final long[] data) {
gadget_.update(data);
}
//Restricted
@Override
long[] getCache() {
return gadget_.getCache();
}
@Override
int getRetainedEntries() {
return gadget_.getRetainedEntries(true);
}
@Override
short getSeedHash() {
return gadget_.getSeedHash();
}
@Override
long getThetaLong() {
return min(unionThetaLong_, gadget_.getThetaLong());
}
@Override
boolean isEmpty() {
return gadget_.isEmpty() && unionEmpty_;
}
/**
* Checks Ordered and Compact flags for integrity between sketch and its MemorySegment
* @param sketch the given sketch
*/
private static final void checkSketchAndMemorySegmentFlags(final ThetaSketch sketch) {
final MemorySegment seg = sketch.getMemorySegment();
if (seg == null) { return; }
final int flags = PreambleUtil.extractFlags(seg);
if ((flags & COMPACT_FLAG_MASK) > 0 ^ sketch.isCompact()) {
throw new SketchesArgumentException("Possible corruption: "
+ "MemorySegment Compact Flag inconsistent with Sketch");
}
if ((flags & ORDERED_FLAG_MASK) > 0 ^ sketch.isOrdered()) {
throw new SketchesArgumentException("Possible corruption: "
+ "MemorySegment Ordered Flag inconsistent with Sketch");
}
}
}