blob: dca356971832f2f657aa100f9b93ad020abe68e1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.datasketches.theta;
import static org.apache.datasketches.Util.DEFAULT_NOMINAL_ENTRIES;
import static org.apache.datasketches.Util.DEFAULT_UPDATE_SEED;
import static org.apache.datasketches.Util.LS;
import static org.apache.datasketches.Util.MAX_LG_NOM_LONGS;
import static org.apache.datasketches.Util.MIN_LG_NOM_LONGS;
import static org.apache.datasketches.Util.TAB;
import static org.apache.datasketches.Util.ceilingPowerOf2;
import static org.apache.datasketches.Util.checkNomLongs;
import org.apache.datasketches.Family;
import org.apache.datasketches.ResizeFactor;
import org.apache.datasketches.SketchesArgumentException;
import org.apache.datasketches.SketchesStateException;
import org.apache.datasketches.memory.DefaultMemoryRequestServer;
import org.apache.datasketches.memory.MemoryRequestServer;
import org.apache.datasketches.memory.WritableMemory;
/**
* For building a new UpdateSketch.
*
* @author Lee Rhodes
*/
public class UpdateSketchBuilder {
private int bLgNomLongs;
private long bSeed;
private ResizeFactor bRF;
private Family bFam;
private float bP;
private MemoryRequestServer bMemReqSvr;
//Fields for concurrent theta sketch
private int bNumPoolThreads;
private int bLocalLgNomLongs;
private boolean bPropagateOrderedCompact;
private double bMaxConcurrencyError;
private int bMaxNumLocalThreads;
/**
* Constructor for building a new UpdateSketch. The default configuration is
* <ul>
* <li>Nominal Entries: {@value org.apache.datasketches.Util#DEFAULT_NOMINAL_ENTRIES}</li>
* <li>Seed: {@value org.apache.datasketches.Util#DEFAULT_UPDATE_SEED}</li>
* <li>Input Sampling Probability: 1.0</li>
* <li>Family: {@link org.apache.datasketches.Family#QUICKSELECT}</li>
* <li>Resize Factor: The default for sketches on the Java heap is {@link ResizeFactor#X8}.
* For direct sketches, which are targeted for native memory off the Java heap, this value will
* be fixed at either {@link ResizeFactor#X1} or {@link ResizeFactor#X2}.</li>
* <li>MemoryRequestServer (Direct only):
* {@link org.apache.datasketches.memory.DefaultMemoryRequestServer}.</li>
* </ul>
* Parameters unique to the concurrent sketches only:
* <ul>
* <li>Number of local Nominal Entries: 4</li>
* <li>Concurrent NumPoolThreads: 3</li>
* <li>Concurrent PropagateOrderedCompact: true</li>
* <li>Concurrent MaxConcurrencyError: 0</li>
* </ul>
*/
public UpdateSketchBuilder() {
bLgNomLongs = Integer.numberOfTrailingZeros(DEFAULT_NOMINAL_ENTRIES);
bSeed = DEFAULT_UPDATE_SEED;
bP = (float) 1.0;
bRF = ResizeFactor.X8;
bFam = Family.QUICKSELECT;
bMemReqSvr = new DefaultMemoryRequestServer();
// Default values for concurrent sketch
bNumPoolThreads = ConcurrentPropagationService.NUM_POOL_THREADS;
bLocalLgNomLongs = 4; //default is smallest legal QS sketch
bPropagateOrderedCompact = true;
bMaxConcurrencyError = 0;
bMaxNumLocalThreads = 1;
}
/**
* Sets the Nominal Entries for this sketch.
* This value is also used for building a shared concurrent sketch.
* The minimum value is 16 (2^4) and the maximum value is 67,108,864 (2^26).
* Be aware that sketches as large as this maximum value may not have been
* thoroughly tested or characterized for performance.
*
* @param nomEntries <a href="{@docRoot}/resources/dictionary.html#nomEntries">Nominal Entries</a>
* This will become the ceiling power of 2 if the given value is not.
* @return this UpdateSketchBuilder
*/
public UpdateSketchBuilder setNominalEntries(final int nomEntries) {
bLgNomLongs = checkNomLongs(nomEntries);
return this;
}
/**
* Alternative method of setting the Nominal Entries for this sketch from the log_base2 value.
* This value is also used for building a shared concurrent sketch.
* The minimum value is 4 and the maximum value is 26.
* Be aware that sketches as large as this maximum value may not have been
* thoroughly tested or characterized for performance.
*
* @param lgNomEntries the Log Nominal Entries. Also for the concurrent shared sketch
* @return this UpdateSketchBuilder
*/
public UpdateSketchBuilder setLogNominalEntries(final int lgNomEntries) {
bLgNomLongs = checkNomLongs(1 << lgNomEntries);
return this;
}
/**
* Returns Log-base 2 Nominal Entries
* @return Log-base 2 Nominal Entries
*/
public int getLgNominalEntries() {
return bLgNomLongs;
}
/**
* Sets the Nominal Entries for the concurrent local sketch. The minimum value is 16 and the
* maximum value is 67,108,864, which is 2^26.
* Be aware that sketches as large as this maximum
* value have not been thoroughly tested or characterized for performance.
*
* @param nomEntries <a href="{@docRoot}/resources/dictionary.html#nomEntries">Nominal Entries</a>
* This will become the ceiling power of 2 if it is not.
* @return this UpdateSketchBuilder
*/
public UpdateSketchBuilder setLocalNominalEntries(final int nomEntries) {
bLocalLgNomLongs = Integer.numberOfTrailingZeros(ceilingPowerOf2(nomEntries));
if ((bLocalLgNomLongs > MAX_LG_NOM_LONGS) || (bLocalLgNomLongs < MIN_LG_NOM_LONGS)) {
throw new SketchesArgumentException(
"Nominal Entries must be >= 16 and <= 67108864: " + nomEntries);
}
return this;
}
/**
* Alternative method of setting the Nominal Entries for a local concurrent sketch from the
* log_base2 value.
* The minimum value is 4 and the maximum value is 26.
* Be aware that sketches as large as this maximum
* value have not been thoroughly tested or characterized for performance.
*
* @param lgNomEntries the Log Nominal Entries for a concurrent local sketch
* @return this UpdateSketchBuilder
*/
public UpdateSketchBuilder setLocalLogNominalEntries(final int lgNomEntries) {
bLocalLgNomLongs = lgNomEntries;
if ((bLocalLgNomLongs > MAX_LG_NOM_LONGS) || (bLocalLgNomLongs < MIN_LG_NOM_LONGS)) {
throw new SketchesArgumentException(
"Log Nominal Entries must be >= 4 and <= 26: " + lgNomEntries);
}
return this;
}
/**
* Returns Log-base 2 Nominal Entries for the concurrent local sketch
* @return Log-base 2 Nominal Entries for the concurrent local sketch
*/
public int getLocalLgNominalEntries() {
return bLocalLgNomLongs;
}
/**
* Sets the long seed value that is required by the hashing function.
* @param seed <a href="{@docRoot}/resources/dictionary.html#seed">See seed</a>
* @return this UpdateSketchBuilder
*/
public UpdateSketchBuilder setSeed(final long seed) {
bSeed = seed;
return this;
}
/**
* Returns the seed
* @return the seed
*/
public long getSeed() {
return bSeed;
}
/**
* Sets the upfront uniform sampling probability, <i>p</i>
* @param p <a href="{@docRoot}/resources/dictionary.html#p">See Sampling Probability, <i>p</i></a>
* @return this UpdateSketchBuilder
*/
public UpdateSketchBuilder setP(final float p) {
if ((p <= 0.0) || (p > 1.0)) {
throw new SketchesArgumentException("p must be > 0 and <= 1.0: " + p);
}
bP = p;
return this;
}
/**
* Returns the pre-sampling probability <i>p</i>
* @return the pre-sampling probability <i>p</i>
*/
public float getP() {
return bP;
}
/**
* Sets the cache Resize Factor.
* @param rf <a href="{@docRoot}/resources/dictionary.html#resizeFactor">See Resize Factor</a>
* @return this UpdateSketchBuilder
*/
public UpdateSketchBuilder setResizeFactor(final ResizeFactor rf) {
bRF = rf;
return this;
}
/**
* Returns the Resize Factor
* @return the Resize Factor
*/
public ResizeFactor getResizeFactor() {
return bRF;
}
/**
* Set the Family.
* @param family the family for this builder
* @return this UpdateSketchBuilder
*/
public UpdateSketchBuilder setFamily(final Family family) {
bFam = family;
return this;
}
/**
* Returns the Family
* @return the Family
*/
public Family getFamily() {
return bFam;
}
/**
* Set the MemoryRequestServer
* @param memReqSvr the given MemoryRequestServer
* @return this UpdateSketchBuilder
*/
public UpdateSketchBuilder setMemoryRequestServer(final MemoryRequestServer memReqSvr) {
bMemReqSvr = memReqSvr;
return this;
}
/**
* Returns the MemoryRequestServer
* @return the MemoryRequestServer
*/
public MemoryRequestServer getMemoryRequestServer() {
return bMemReqSvr;
}
/**
* Sets the number of pool threads used for background propagation in the concurrent sketches.
* @param numPoolThreads the given number of pool threads
*/
public void setNumPoolThreads(final int numPoolThreads) {
bNumPoolThreads = numPoolThreads;
}
/**
* Gets the number of background pool threads used for propagation in the concurrent sketches.
* @return the number of background pool threads
*/
public int getNumPoolThreads() {
return bNumPoolThreads;
}
/**
* Sets the Propagate Ordered Compact flag to the given value. Used with concurrent sketches.
*
* @param prop the given value
* @return this UpdateSketchBuilder
*/
public UpdateSketchBuilder setPropagateOrderedCompact(final boolean prop) {
bPropagateOrderedCompact = prop;
return this;
}
/**
* Gets the Propagate Ordered Compact flag used with concurrent sketches.
* @return the Propagate Ordered Compact flag
*/
public boolean getPropagateOrderedCompact() {
return bPropagateOrderedCompact;
}
/**
* Sets the Maximum Concurrency Error.
* @param maxConcurrencyError the given Maximum Concurrency Error.
*/
public void setMaxConcurrencyError(final double maxConcurrencyError) {
bMaxConcurrencyError = maxConcurrencyError;
}
/**
* Gets the Maximum Concurrency Error
* @return the Maximum Concurrency Error
*/
public double getMaxConcurrencyError() {
return bMaxConcurrencyError;
}
/**
* Sets the Maximum Number of Local Threads.
* This is used to set the size of the local concurrent buffers.
* @param maxNumLocalThreads the given Maximum Number of Local Threads
*/
public void setMaxNumLocalThreads(final int maxNumLocalThreads) {
bMaxNumLocalThreads = maxNumLocalThreads;
}
/**
* Gets the Maximum Number of Local Threads.
* @return the Maximum Number of Local Threads.
*/
public int getMaxNumLocalThreads() {
return bMaxNumLocalThreads;
}
// BUILD FUNCTIONS
/**
* Returns an UpdateSketch with the current configuration of this Builder.
* @return an UpdateSketch
*/
public UpdateSketch build() {
return build(null);
}
/**
* Returns an UpdateSketch with the current configuration of this Builder
* with the specified backing destination Memory store.
* Note: this cannot be used with the Alpha Family of sketches.
* @param dstMem The destination Memory.
* @return an UpdateSketch
*/
public UpdateSketch build(final WritableMemory dstMem) {
UpdateSketch sketch = null;
switch (bFam) {
case ALPHA: {
if (dstMem == null) {
sketch = HeapAlphaSketch.newHeapInstance(bLgNomLongs, bSeed, bP, bRF);
}
else {
throw new SketchesArgumentException("AlphaSketch cannot be made Direct to Memory.");
}
break;
}
case QUICKSELECT: {
if (dstMem == null) {
sketch = new HeapQuickSelectSketch(bLgNomLongs, bSeed, bP, bRF, false);
}
else {
sketch = new DirectQuickSelectSketch(
bLgNomLongs, bSeed, bP, bRF, bMemReqSvr, dstMem, false);
}
break;
}
default: {
throw new SketchesArgumentException(
"Given Family cannot be built as a Theta Sketch: " + bFam.toString());
}
}
return sketch;
}
/**
* Returns an on-heap concurrent shared UpdateSketch with the current configuration of the
* Builder.
*
* <p>The parameters unique to the shared concurrent sketch are:
* <ul>
* <li>Number of Pool Threads (default is 3)</li>
* <li>Maximum Concurrency Error</li>
* </ul>
*
* <p>Key parameters that are in common with other <i>Theta</i> sketches:
* <ul>
* <li>Nominal Entries or Log Nominal Entries (for the shared concurrent sketch)</li>
* </ul>
*
* @return an on-heap concurrent UpdateSketch with the current configuration of the Builder.
*/
public UpdateSketch buildShared() {
return buildShared(null);
}
/**
* Returns a direct (potentially off-heap) concurrent shared UpdateSketch with the current
* configuration of the Builder and the given destination WritableMemory. If the destination
* WritableMemory is null, this defaults to an on-heap concurrent shared UpdateSketch.
*
* <p>The parameters unique to the shared concurrent sketch are:
* <ul>
* <li>Number of Pool Threads (default is 3)</li>
* <li>Maximum Concurrency Error</li>
* </ul>
*
* <p>Key parameters that are in common with other <i>Theta</i> sketches:
* <ul>
* <li>Nominal Entries or Log Nominal Entries (for the shared concurrent sketch)</li>
* <li>Destination Writable Memory (if not null, returned sketch is Direct. Default is null.)</li>
* </ul>
*
* @param dstMem the given WritableMemory for Direct, otherwise <i>null</i>.
* @return a concurrent UpdateSketch with the current configuration of the Builder
* and the given destination WritableMemory.
*/
public UpdateSketch buildShared(final WritableMemory dstMem) {
ConcurrentPropagationService.NUM_POOL_THREADS = bNumPoolThreads;
if (dstMem == null) {
return new ConcurrentHeapQuickSelectSketch(bLgNomLongs, bSeed, bMaxConcurrencyError);
} else {
return new ConcurrentDirectQuickSelectSketch(bLgNomLongs, bSeed, bMaxConcurrencyError, dstMem);
}
}
/**
* Returns a direct (potentially off-heap) concurrent shared UpdateSketch with the current
* configuration of the Builder, the data from the given sketch, and the given destination
* WritableMemory. If the destination WritableMemory is null, this defaults to an on-heap
* concurrent shared UpdateSketch.
*
* <p>The parameters unique to the shared concurrent sketch are:
* <ul>
* <li>Number of Pool Threads (default is 3)</li>
* <li>Maximum Concurrency Error</li>
* </ul>
*
* <p>Key parameters that are in common with other <i>Theta</i> sketches:
* <ul>
* <li>Nominal Entries or Log Nominal Entries (for the shared concurrent sketch)</li>
* <li>Destination Writable Memory (if not null, returned sketch is Direct. Default is null.)</li>
* </ul>
*
* @param sketch a given UpdateSketch from which the data is used to initialize the returned
* shared sketch.
* @param dstMem the given WritableMemory for Direct, otherwise <i>null</i>.
* @return a concurrent UpdateSketch with the current configuration of the Builder
* and the given destination WritableMemory.
*/
public UpdateSketch buildSharedFromSketch(final UpdateSketch sketch, final WritableMemory dstMem) {
ConcurrentPropagationService.NUM_POOL_THREADS = bNumPoolThreads;
if (dstMem == null) {
return new ConcurrentHeapQuickSelectSketch(sketch, bSeed, bMaxConcurrencyError);
} else {
return new ConcurrentDirectQuickSelectSketch(sketch, bSeed, bMaxConcurrencyError, dstMem);
}
}
/**
* Returns a local, on-heap, concurrent UpdateSketch to be used as a per-thread local buffer
* along with the given concurrent shared UpdateSketch and the current configuration of this
* Builder.
*
* <p>The parameters unique to the local concurrent sketch are:
* <ul>
* <li>Local Nominal Entries or Local Log Nominal Entries</li>
* <li>Propagate Ordered Compact flag</li>
* </ul>
*
* @param shared the concurrent shared sketch to be accessed via the concurrent local sketch.
* @return an UpdateSketch to be used as a per-thread local buffer.
*/
public UpdateSketch buildLocal(final UpdateSketch shared) {
if ((shared == null) || !(shared instanceof ConcurrentSharedThetaSketch)) {
throw new SketchesStateException("The concurrent shared sketch must be built first.");
}
return new ConcurrentHeapThetaBuffer(bLocalLgNomLongs, bSeed,
(ConcurrentSharedThetaSketch) shared, bPropagateOrderedCompact, bMaxNumLocalThreads);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
sb.append("UpdateSketchBuilder configuration:").append(LS);
sb.append("LgK:").append(TAB).append(bLgNomLongs).append(LS);
sb.append("K:").append(TAB).append(1 << bLgNomLongs).append(LS);
sb.append("LgLocalK:").append(TAB).append(bLocalLgNomLongs).append(LS);
sb.append("LocalK:").append(TAB).append(1 << bLocalLgNomLongs).append(LS);
sb.append("Seed:").append(TAB).append(bSeed).append(LS);
sb.append("p:").append(TAB).append(bP).append(LS);
sb.append("ResizeFactor:").append(TAB).append(bRF).append(LS);
sb.append("Family:").append(TAB).append(bFam).append(LS);
final String mrsStr = bMemReqSvr.getClass().getSimpleName();
sb.append("MemoryRequestServer:").append(TAB).append(mrsStr).append(LS);
sb.append("Propagate Ordered Compact").append(TAB).append(bPropagateOrderedCompact).append(LS);
sb.append("NumPoolThreads").append(TAB).append(bNumPoolThreads).append(LS);
sb.append("MaxConcurrencyError").append(TAB).append(bMaxConcurrencyError).append(LS);
sb.append("MaxNumLocalThreads").append(TAB).append(bMaxNumLocalThreads).append(LS);
return sb.toString();
}
}