| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.datasketches.theta; |
| |
| import static org.apache.datasketches.Util.DEFAULT_NOMINAL_ENTRIES; |
| import static org.apache.datasketches.Util.DEFAULT_UPDATE_SEED; |
| import static org.apache.datasketches.Util.LS; |
| import static org.apache.datasketches.Util.MAX_LG_NOM_LONGS; |
| import static org.apache.datasketches.Util.MIN_LG_NOM_LONGS; |
| import static org.apache.datasketches.Util.TAB; |
| import static org.apache.datasketches.Util.ceilingPowerOf2; |
| import static org.apache.datasketches.Util.checkNomLongs; |
| |
| import org.apache.datasketches.Family; |
| import org.apache.datasketches.ResizeFactor; |
| import org.apache.datasketches.SketchesArgumentException; |
| import org.apache.datasketches.SketchesStateException; |
| import org.apache.datasketches.memory.DefaultMemoryRequestServer; |
| import org.apache.datasketches.memory.MemoryRequestServer; |
| import org.apache.datasketches.memory.WritableMemory; |
| |
| /** |
| * For building a new UpdateSketch. |
| * |
| * @author Lee Rhodes |
| */ |
| public class UpdateSketchBuilder { |
| private int bLgNomLongs; |
| private long bSeed; |
| private ResizeFactor bRF; |
| private Family bFam; |
| private float bP; |
| private MemoryRequestServer bMemReqSvr; |
| |
| //Fields for concurrent theta sketch |
| private int bNumPoolThreads; |
| private int bLocalLgNomLongs; |
| private boolean bPropagateOrderedCompact; |
| private double bMaxConcurrencyError; |
| private int bMaxNumLocalThreads; |
| |
| /** |
| * Constructor for building a new UpdateSketch. The default configuration is |
| * <ul> |
| * <li>Nominal Entries: {@value org.apache.datasketches.Util#DEFAULT_NOMINAL_ENTRIES}</li> |
| * <li>Seed: {@value org.apache.datasketches.Util#DEFAULT_UPDATE_SEED}</li> |
| * <li>Input Sampling Probability: 1.0</li> |
| * <li>Family: {@link org.apache.datasketches.Family#QUICKSELECT}</li> |
| * <li>Resize Factor: The default for sketches on the Java heap is {@link ResizeFactor#X8}. |
| * For direct sketches, which are targeted for native memory off the Java heap, this value will |
| * be fixed at either {@link ResizeFactor#X1} or {@link ResizeFactor#X2}.</li> |
| * <li>MemoryRequestServer (Direct only): |
| * {@link org.apache.datasketches.memory.DefaultMemoryRequestServer}.</li> |
| * </ul> |
| * Parameters unique to the concurrent sketches only: |
| * <ul> |
| * <li>Number of local Nominal Entries: 4</li> |
| * <li>Concurrent NumPoolThreads: 3</li> |
| * <li>Concurrent PropagateOrderedCompact: true</li> |
| * <li>Concurrent MaxConcurrencyError: 0</li> |
| * </ul> |
| */ |
| public UpdateSketchBuilder() { |
| bLgNomLongs = Integer.numberOfTrailingZeros(DEFAULT_NOMINAL_ENTRIES); |
| bSeed = DEFAULT_UPDATE_SEED; |
| bP = (float) 1.0; |
| bRF = ResizeFactor.X8; |
| bFam = Family.QUICKSELECT; |
| bMemReqSvr = new DefaultMemoryRequestServer(); |
| // Default values for concurrent sketch |
| bNumPoolThreads = ConcurrentPropagationService.NUM_POOL_THREADS; |
| bLocalLgNomLongs = 4; //default is smallest legal QS sketch |
| bPropagateOrderedCompact = true; |
| bMaxConcurrencyError = 0; |
| bMaxNumLocalThreads = 1; |
| } |
| |
| /** |
| * Sets the Nominal Entries for this sketch. |
| * This value is also used for building a shared concurrent sketch. |
| * The minimum value is 16 (2^4) and the maximum value is 67,108,864 (2^26). |
| * Be aware that sketches as large as this maximum value may not have been |
| * thoroughly tested or characterized for performance. |
| * |
| * @param nomEntries <a href="{@docRoot}/resources/dictionary.html#nomEntries">Nominal Entries</a> |
| * This will become the ceiling power of 2 if the given value is not. |
| * @return this UpdateSketchBuilder |
| */ |
| public UpdateSketchBuilder setNominalEntries(final int nomEntries) { |
| bLgNomLongs = checkNomLongs(nomEntries); |
| return this; |
| } |
| |
| /** |
| * Alternative method of setting the Nominal Entries for this sketch from the log_base2 value. |
| * This value is also used for building a shared concurrent sketch. |
| * The minimum value is 4 and the maximum value is 26. |
| * Be aware that sketches as large as this maximum value may not have been |
| * thoroughly tested or characterized for performance. |
| * |
| * @param lgNomEntries the Log Nominal Entries. Also for the concurrent shared sketch |
| * @return this UpdateSketchBuilder |
| */ |
| public UpdateSketchBuilder setLogNominalEntries(final int lgNomEntries) { |
| bLgNomLongs = checkNomLongs(1 << lgNomEntries); |
| return this; |
| } |
| |
| /** |
| * Returns Log-base 2 Nominal Entries |
| * @return Log-base 2 Nominal Entries |
| */ |
| public int getLgNominalEntries() { |
| return bLgNomLongs; |
| } |
| |
| /** |
| * Sets the Nominal Entries for the concurrent local sketch. The minimum value is 16 and the |
| * maximum value is 67,108,864, which is 2^26. |
| * Be aware that sketches as large as this maximum |
| * value have not been thoroughly tested or characterized for performance. |
| * |
| * @param nomEntries <a href="{@docRoot}/resources/dictionary.html#nomEntries">Nominal Entries</a> |
| * This will become the ceiling power of 2 if it is not. |
| * @return this UpdateSketchBuilder |
| */ |
| public UpdateSketchBuilder setLocalNominalEntries(final int nomEntries) { |
| bLocalLgNomLongs = Integer.numberOfTrailingZeros(ceilingPowerOf2(nomEntries)); |
| if ((bLocalLgNomLongs > MAX_LG_NOM_LONGS) || (bLocalLgNomLongs < MIN_LG_NOM_LONGS)) { |
| throw new SketchesArgumentException( |
| "Nominal Entries must be >= 16 and <= 67108864: " + nomEntries); |
| } |
| return this; |
| } |
| |
| /** |
| * Alternative method of setting the Nominal Entries for a local concurrent sketch from the |
| * log_base2 value. |
| * The minimum value is 4 and the maximum value is 26. |
| * Be aware that sketches as large as this maximum |
| * value have not been thoroughly tested or characterized for performance. |
| * |
| * @param lgNomEntries the Log Nominal Entries for a concurrent local sketch |
| * @return this UpdateSketchBuilder |
| */ |
| public UpdateSketchBuilder setLocalLogNominalEntries(final int lgNomEntries) { |
| bLocalLgNomLongs = lgNomEntries; |
| if ((bLocalLgNomLongs > MAX_LG_NOM_LONGS) || (bLocalLgNomLongs < MIN_LG_NOM_LONGS)) { |
| throw new SketchesArgumentException( |
| "Log Nominal Entries must be >= 4 and <= 26: " + lgNomEntries); |
| } |
| return this; |
| } |
| |
| /** |
| * Returns Log-base 2 Nominal Entries for the concurrent local sketch |
| * @return Log-base 2 Nominal Entries for the concurrent local sketch |
| */ |
| public int getLocalLgNominalEntries() { |
| return bLocalLgNomLongs; |
| } |
| |
| /** |
| * Sets the long seed value that is required by the hashing function. |
| * @param seed <a href="{@docRoot}/resources/dictionary.html#seed">See seed</a> |
| * @return this UpdateSketchBuilder |
| */ |
| public UpdateSketchBuilder setSeed(final long seed) { |
| bSeed = seed; |
| return this; |
| } |
| |
| /** |
| * Returns the seed |
| * @return the seed |
| */ |
| public long getSeed() { |
| return bSeed; |
| } |
| |
| /** |
| * Sets the upfront uniform sampling probability, <i>p</i> |
| * @param p <a href="{@docRoot}/resources/dictionary.html#p">See Sampling Probability, <i>p</i></a> |
| * @return this UpdateSketchBuilder |
| */ |
| public UpdateSketchBuilder setP(final float p) { |
| if ((p <= 0.0) || (p > 1.0)) { |
| throw new SketchesArgumentException("p must be > 0 and <= 1.0: " + p); |
| } |
| bP = p; |
| return this; |
| } |
| |
| /** |
| * Returns the pre-sampling probability <i>p</i> |
| * @return the pre-sampling probability <i>p</i> |
| */ |
| public float getP() { |
| return bP; |
| } |
| |
| /** |
| * Sets the cache Resize Factor. |
| * @param rf <a href="{@docRoot}/resources/dictionary.html#resizeFactor">See Resize Factor</a> |
| * @return this UpdateSketchBuilder |
| */ |
| public UpdateSketchBuilder setResizeFactor(final ResizeFactor rf) { |
| bRF = rf; |
| return this; |
| } |
| |
| /** |
| * Returns the Resize Factor |
| * @return the Resize Factor |
| */ |
| public ResizeFactor getResizeFactor() { |
| return bRF; |
| } |
| |
| /** |
| * Set the Family. |
| * @param family the family for this builder |
| * @return this UpdateSketchBuilder |
| */ |
| public UpdateSketchBuilder setFamily(final Family family) { |
| bFam = family; |
| return this; |
| } |
| |
| /** |
| * Returns the Family |
| * @return the Family |
| */ |
| public Family getFamily() { |
| return bFam; |
| } |
| |
| /** |
| * Set the MemoryRequestServer |
| * @param memReqSvr the given MemoryRequestServer |
| * @return this UpdateSketchBuilder |
| */ |
| public UpdateSketchBuilder setMemoryRequestServer(final MemoryRequestServer memReqSvr) { |
| bMemReqSvr = memReqSvr; |
| return this; |
| } |
| |
| /** |
| * Returns the MemoryRequestServer |
| * @return the MemoryRequestServer |
| */ |
| public MemoryRequestServer getMemoryRequestServer() { |
| return bMemReqSvr; |
| } |
| |
| /** |
| * Sets the number of pool threads used for background propagation in the concurrent sketches. |
| * @param numPoolThreads the given number of pool threads |
| */ |
| public void setNumPoolThreads(final int numPoolThreads) { |
| bNumPoolThreads = numPoolThreads; |
| } |
| |
| /** |
| * Gets the number of background pool threads used for propagation in the concurrent sketches. |
| * @return the number of background pool threads |
| */ |
| public int getNumPoolThreads() { |
| return bNumPoolThreads; |
| } |
| |
| /** |
| * Sets the Propagate Ordered Compact flag to the given value. Used with concurrent sketches. |
| * |
| * @param prop the given value |
| * @return this UpdateSketchBuilder |
| */ |
| public UpdateSketchBuilder setPropagateOrderedCompact(final boolean prop) { |
| bPropagateOrderedCompact = prop; |
| return this; |
| } |
| |
| /** |
| * Gets the Propagate Ordered Compact flag used with concurrent sketches. |
| * @return the Propagate Ordered Compact flag |
| */ |
| public boolean getPropagateOrderedCompact() { |
| return bPropagateOrderedCompact; |
| } |
| |
| /** |
| * Sets the Maximum Concurrency Error. |
| * @param maxConcurrencyError the given Maximum Concurrency Error. |
| */ |
| public void setMaxConcurrencyError(final double maxConcurrencyError) { |
| bMaxConcurrencyError = maxConcurrencyError; |
| } |
| |
| /** |
| * Gets the Maximum Concurrency Error |
| * @return the Maximum Concurrency Error |
| */ |
| public double getMaxConcurrencyError() { |
| return bMaxConcurrencyError; |
| } |
| |
| /** |
| * Sets the Maximum Number of Local Threads. |
| * This is used to set the size of the local concurrent buffers. |
| * @param maxNumLocalThreads the given Maximum Number of Local Threads |
| */ |
| public void setMaxNumLocalThreads(final int maxNumLocalThreads) { |
| bMaxNumLocalThreads = maxNumLocalThreads; |
| } |
| |
| /** |
| * Gets the Maximum Number of Local Threads. |
| * @return the Maximum Number of Local Threads. |
| */ |
| public int getMaxNumLocalThreads() { |
| return bMaxNumLocalThreads; |
| } |
| |
| // BUILD FUNCTIONS |
| |
| /** |
| * Returns an UpdateSketch with the current configuration of this Builder. |
| * @return an UpdateSketch |
| */ |
| public UpdateSketch build() { |
| return build(null); |
| } |
| |
| /** |
| * Returns an UpdateSketch with the current configuration of this Builder |
| * with the specified backing destination Memory store. |
| * Note: this cannot be used with the Alpha Family of sketches. |
| * @param dstMem The destination Memory. |
| * @return an UpdateSketch |
| */ |
| public UpdateSketch build(final WritableMemory dstMem) { |
| UpdateSketch sketch = null; |
| switch (bFam) { |
| case ALPHA: { |
| if (dstMem == null) { |
| sketch = HeapAlphaSketch.newHeapInstance(bLgNomLongs, bSeed, bP, bRF); |
| } |
| else { |
| throw new SketchesArgumentException("AlphaSketch cannot be made Direct to Memory."); |
| } |
| break; |
| } |
| case QUICKSELECT: { |
| if (dstMem == null) { |
| sketch = new HeapQuickSelectSketch(bLgNomLongs, bSeed, bP, bRF, false); |
| } |
| else { |
| sketch = new DirectQuickSelectSketch( |
| bLgNomLongs, bSeed, bP, bRF, bMemReqSvr, dstMem, false); |
| } |
| break; |
| } |
| default: { |
| throw new SketchesArgumentException( |
| "Given Family cannot be built as a Theta Sketch: " + bFam.toString()); |
| } |
| } |
| return sketch; |
| } |
| |
| /** |
| * Returns an on-heap concurrent shared UpdateSketch with the current configuration of the |
| * Builder. |
| * |
| * <p>The parameters unique to the shared concurrent sketch are: |
| * <ul> |
| * <li>Number of Pool Threads (default is 3)</li> |
| * <li>Maximum Concurrency Error</li> |
| * </ul> |
| * |
| * <p>Key parameters that are in common with other <i>Theta</i> sketches: |
| * <ul> |
| * <li>Nominal Entries or Log Nominal Entries (for the shared concurrent sketch)</li> |
| * </ul> |
| * |
| * @return an on-heap concurrent UpdateSketch with the current configuration of the Builder. |
| */ |
| public UpdateSketch buildShared() { |
| return buildShared(null); |
| } |
| |
| /** |
| * Returns a direct (potentially off-heap) concurrent shared UpdateSketch with the current |
| * configuration of the Builder and the given destination WritableMemory. If the destination |
| * WritableMemory is null, this defaults to an on-heap concurrent shared UpdateSketch. |
| * |
| * <p>The parameters unique to the shared concurrent sketch are: |
| * <ul> |
| * <li>Number of Pool Threads (default is 3)</li> |
| * <li>Maximum Concurrency Error</li> |
| * </ul> |
| * |
| * <p>Key parameters that are in common with other <i>Theta</i> sketches: |
| * <ul> |
| * <li>Nominal Entries or Log Nominal Entries (for the shared concurrent sketch)</li> |
| * <li>Destination Writable Memory (if not null, returned sketch is Direct. Default is null.)</li> |
| * </ul> |
| * |
| * @param dstMem the given WritableMemory for Direct, otherwise <i>null</i>. |
| * @return a concurrent UpdateSketch with the current configuration of the Builder |
| * and the given destination WritableMemory. |
| */ |
| public UpdateSketch buildShared(final WritableMemory dstMem) { |
| ConcurrentPropagationService.NUM_POOL_THREADS = bNumPoolThreads; |
| if (dstMem == null) { |
| return new ConcurrentHeapQuickSelectSketch(bLgNomLongs, bSeed, bMaxConcurrencyError); |
| } else { |
| return new ConcurrentDirectQuickSelectSketch(bLgNomLongs, bSeed, bMaxConcurrencyError, dstMem); |
| } |
| } |
| |
| /** |
| * Returns a direct (potentially off-heap) concurrent shared UpdateSketch with the current |
| * configuration of the Builder, the data from the given sketch, and the given destination |
| * WritableMemory. If the destination WritableMemory is null, this defaults to an on-heap |
| * concurrent shared UpdateSketch. |
| * |
| * <p>The parameters unique to the shared concurrent sketch are: |
| * <ul> |
| * <li>Number of Pool Threads (default is 3)</li> |
| * <li>Maximum Concurrency Error</li> |
| * </ul> |
| * |
| * <p>Key parameters that are in common with other <i>Theta</i> sketches: |
| * <ul> |
| * <li>Nominal Entries or Log Nominal Entries (for the shared concurrent sketch)</li> |
| * <li>Destination Writable Memory (if not null, returned sketch is Direct. Default is null.)</li> |
| * </ul> |
| * |
| * @param sketch a given UpdateSketch from which the data is used to initialize the returned |
| * shared sketch. |
| * @param dstMem the given WritableMemory for Direct, otherwise <i>null</i>. |
| * @return a concurrent UpdateSketch with the current configuration of the Builder |
| * and the given destination WritableMemory. |
| */ |
| public UpdateSketch buildSharedFromSketch(final UpdateSketch sketch, final WritableMemory dstMem) { |
| ConcurrentPropagationService.NUM_POOL_THREADS = bNumPoolThreads; |
| if (dstMem == null) { |
| return new ConcurrentHeapQuickSelectSketch(sketch, bSeed, bMaxConcurrencyError); |
| } else { |
| return new ConcurrentDirectQuickSelectSketch(sketch, bSeed, bMaxConcurrencyError, dstMem); |
| } |
| } |
| |
| /** |
| * Returns a local, on-heap, concurrent UpdateSketch to be used as a per-thread local buffer |
| * along with the given concurrent shared UpdateSketch and the current configuration of this |
| * Builder. |
| * |
| * <p>The parameters unique to the local concurrent sketch are: |
| * <ul> |
| * <li>Local Nominal Entries or Local Log Nominal Entries</li> |
| * <li>Propagate Ordered Compact flag</li> |
| * </ul> |
| * |
| * @param shared the concurrent shared sketch to be accessed via the concurrent local sketch. |
| * @return an UpdateSketch to be used as a per-thread local buffer. |
| */ |
| public UpdateSketch buildLocal(final UpdateSketch shared) { |
| if ((shared == null) || !(shared instanceof ConcurrentSharedThetaSketch)) { |
| throw new SketchesStateException("The concurrent shared sketch must be built first."); |
| } |
| return new ConcurrentHeapThetaBuffer(bLocalLgNomLongs, bSeed, |
| (ConcurrentSharedThetaSketch) shared, bPropagateOrderedCompact, bMaxNumLocalThreads); |
| } |
| |
| @Override |
| public String toString() { |
| final StringBuilder sb = new StringBuilder(); |
| sb.append("UpdateSketchBuilder configuration:").append(LS); |
| sb.append("LgK:").append(TAB).append(bLgNomLongs).append(LS); |
| sb.append("K:").append(TAB).append(1 << bLgNomLongs).append(LS); |
| sb.append("LgLocalK:").append(TAB).append(bLocalLgNomLongs).append(LS); |
| sb.append("LocalK:").append(TAB).append(1 << bLocalLgNomLongs).append(LS); |
| sb.append("Seed:").append(TAB).append(bSeed).append(LS); |
| sb.append("p:").append(TAB).append(bP).append(LS); |
| sb.append("ResizeFactor:").append(TAB).append(bRF).append(LS); |
| sb.append("Family:").append(TAB).append(bFam).append(LS); |
| final String mrsStr = bMemReqSvr.getClass().getSimpleName(); |
| sb.append("MemoryRequestServer:").append(TAB).append(mrsStr).append(LS); |
| sb.append("Propagate Ordered Compact").append(TAB).append(bPropagateOrderedCompact).append(LS); |
| sb.append("NumPoolThreads").append(TAB).append(bNumPoolThreads).append(LS); |
| sb.append("MaxConcurrencyError").append(TAB).append(bMaxConcurrencyError).append(LS); |
| sb.append("MaxNumLocalThreads").append(TAB).append(bMaxNumLocalThreads).append(LS); |
| return sb.toString(); |
| } |
| |
| } |