| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.datasketches.quantiles; |
| |
| import static org.apache.datasketches.quantiles.PreambleUtil.COMBINED_BUFFER; |
| import static org.apache.datasketches.quantiles.PreambleUtil.EMPTY_FLAG_MASK; |
| import static org.apache.datasketches.quantiles.PreambleUtil.FLAGS_BYTE; |
| import static org.apache.datasketches.quantiles.PreambleUtil.MAX_DOUBLE; |
| import static org.apache.datasketches.quantiles.PreambleUtil.MIN_DOUBLE; |
| import static org.apache.datasketches.quantiles.PreambleUtil.N_LONG; |
| import static org.apache.datasketches.quantiles.PreambleUtil.extractFamilyID; |
| import static org.apache.datasketches.quantiles.PreambleUtil.extractFlags; |
| import static org.apache.datasketches.quantiles.PreambleUtil.extractK; |
| import static org.apache.datasketches.quantiles.PreambleUtil.extractN; |
| import static org.apache.datasketches.quantiles.PreambleUtil.extractPreLongs; |
| import static org.apache.datasketches.quantiles.PreambleUtil.extractSerVer; |
| import static org.apache.datasketches.quantiles.PreambleUtil.insertFamilyID; |
| import static org.apache.datasketches.quantiles.PreambleUtil.insertFlags; |
| import static org.apache.datasketches.quantiles.PreambleUtil.insertK; |
| import static org.apache.datasketches.quantiles.PreambleUtil.insertMaxDouble; |
| import static org.apache.datasketches.quantiles.PreambleUtil.insertMinDouble; |
| import static org.apache.datasketches.quantiles.PreambleUtil.insertN; |
| import static org.apache.datasketches.quantiles.PreambleUtil.insertPreLongs; |
| import static org.apache.datasketches.quantiles.PreambleUtil.insertSerVer; |
| import static org.apache.datasketches.quantiles.Util.computeBitPattern; |
| |
| import org.apache.datasketches.Family; |
| import org.apache.datasketches.memory.MemoryRequestServer; |
| import org.apache.datasketches.memory.WritableMemory; |
| |
| /** |
| * Implements the DoublesSketch off-heap. |
| * |
| * @author Kevin Lang |
| * @author Lee Rhodes |
| * |
| */ |
| final class DirectUpdateDoublesSketch extends DirectUpdateDoublesSketchR { |
| MemoryRequestServer memReqSvr = null; |
| |
| |
| private DirectUpdateDoublesSketch(final int k) { |
| super(k); //Checks k |
| } |
| |
| /** |
| * Obtains a new Direct instance of a DoublesSketch, which may be off-heap. |
| * |
| * @param k Parameter that controls space usage of sketch and accuracy of estimates. |
| * Must be greater than 1 and less than 65536 and a power of 2. |
| * @param dstMem the destination Memory that will be initialized to hold the data for this sketch. |
| * It must initially be at least (16 * MIN_K + 32) bytes, where MIN_K defaults to 2. As it grows |
| * it will request more memory using the MemoryRequest callback. |
| * @return a DirectUpdateDoublesSketch |
| */ |
| static DirectUpdateDoublesSketch newInstance(final int k, final WritableMemory dstMem) { |
| // must be able to hold at least an empty sketch |
| final long memCap = dstMem.getCapacity(); |
| checkDirectMemCapacity(k, 0, memCap); |
| |
| //initialize dstMem |
| dstMem.putLong(0, 0L); //clear pre0 |
| insertPreLongs(dstMem, 2); |
| insertSerVer(dstMem, DoublesSketch.DOUBLES_SER_VER); |
| insertFamilyID(dstMem, Family.QUANTILES.getID()); |
| insertFlags(dstMem, EMPTY_FLAG_MASK); |
| insertK(dstMem, k); |
| |
| if (memCap >= COMBINED_BUFFER) { |
| insertN(dstMem, 0L); |
| insertMinDouble(dstMem, Double.NaN); |
| insertMaxDouble(dstMem, Double.NaN); |
| } |
| |
| final DirectUpdateDoublesSketch dds = new DirectUpdateDoublesSketch(k); |
| dds.mem_ = dstMem; |
| return dds; |
| } |
| |
| /** |
| * Wrap this sketch around the given non-compact Memory image of a DoublesSketch. |
| * |
| * @param srcMem the given non-compact Memory image of a DoublesSketch that may have data |
| * @return a sketch that wraps the given srcMem |
| */ |
| static DirectUpdateDoublesSketch wrapInstance(final WritableMemory srcMem) { |
| final long memCap = srcMem.getCapacity(); |
| |
| final int preLongs = extractPreLongs(srcMem); |
| final int serVer = extractSerVer(srcMem); |
| final int familyID = extractFamilyID(srcMem); |
| final int flags = extractFlags(srcMem); |
| final int k = extractK(srcMem); |
| |
| final boolean empty = (flags & EMPTY_FLAG_MASK) > 0; //Preamble flags empty state |
| final long n = empty ? 0 : extractN(srcMem); |
| |
| //VALIDITY CHECKS |
| checkPreLongs(preLongs); |
| Util.checkFamilyID(familyID); |
| DoublesUtil.checkDoublesSerVer(serVer, MIN_DIRECT_DOUBLES_SER_VER); |
| checkDirectFlags(flags); //Cannot be compact |
| Util.checkK(k); |
| checkCompact(serVer, flags); |
| checkDirectMemCapacity(k, n, memCap); |
| checkEmptyAndN(empty, n); |
| |
| final DirectUpdateDoublesSketch dds = new DirectUpdateDoublesSketch(k); |
| dds.mem_ = srcMem; |
| return dds; |
| } |
| |
| @Override |
| public void update(final double dataItem) { |
| if (Double.isNaN(dataItem)) { return; } |
| |
| final int curBBCount = getBaseBufferCount(); |
| final int newBBCount = curBBCount + 1; //derived, not stored |
| |
| //must check memory capacity before we put anything in it |
| final int combBufItemCap = getCombinedBufferItemCapacity(); |
| if (newBBCount > combBufItemCap) { |
| //only changes combinedBuffer when it is only a base buffer |
| mem_ = growCombinedMemBuffer(2 * getK()); |
| } |
| |
| final long curN = getN(); |
| final long newN = curN + 1; |
| |
| if (curN == 0) { //set min and max values |
| putMaxValue(dataItem); |
| putMinValue(dataItem); |
| } else { |
| if (dataItem > getMaxValue()) { putMaxValue(dataItem); } |
| if (dataItem < getMinValue()) { putMinValue(dataItem); } |
| } |
| |
| mem_.putDouble(COMBINED_BUFFER + ((long) curBBCount * Double.BYTES), dataItem); //put the item |
| mem_.putByte(FLAGS_BYTE, (byte) 0); //not compact, not ordered, not empty |
| |
| if (newBBCount == (2 * k_)) { //Propagate |
| // make sure there will be enough levels for the propagation |
| final int curMemItemCap = getCombinedBufferItemCapacity(); |
| final int itemSpaceNeeded = DoublesUpdateImpl.getRequiredItemCapacity(k_, newN); |
| |
| //check mem has capacity to accommodate new level |
| if (itemSpaceNeeded > curMemItemCap) { |
| // copies base buffer plus old levels, adds space for new level |
| mem_ = growCombinedMemBuffer(itemSpaceNeeded); |
| } |
| |
| // sort base buffer via accessor which modifies the underlying base buffer, |
| // then use as one of the inputs to propagate-carry |
| final DoublesSketchAccessor bbAccessor = DoublesSketchAccessor.wrap(this, true); |
| bbAccessor.sort(); |
| |
| final long newBitPattern = DoublesUpdateImpl.inPlacePropagateCarry( |
| 0, // starting level |
| null, |
| bbAccessor, |
| true, |
| k_, |
| DoublesSketchAccessor.wrap(this, true), |
| getBitPattern() |
| ); |
| |
| assert newBitPattern == computeBitPattern(k_, newN); // internal consistency check |
| //bit pattern on direct is always derived, no need to save it. |
| } |
| putN(newN); |
| } |
| |
| @Override |
| public void reset() { |
| if (mem_.getCapacity() >= COMBINED_BUFFER) { |
| mem_.putByte(FLAGS_BYTE, (byte) EMPTY_FLAG_MASK); //not compact, not ordered |
| mem_.putLong(N_LONG, 0L); |
| mem_.putDouble(MIN_DOUBLE, Double.NaN); |
| mem_.putDouble(MAX_DOUBLE, Double.NaN); |
| } |
| } |
| |
| //Restricted overrides |
| //Puts |
| |
| @Override |
| void putMinValue(final double minValue) { |
| assert (mem_.getCapacity() >= COMBINED_BUFFER); |
| mem_.putDouble(MIN_DOUBLE, minValue); |
| } |
| |
| @Override |
| void putMaxValue(final double maxValue) { |
| assert (mem_.getCapacity() >= COMBINED_BUFFER); |
| mem_.putDouble(MAX_DOUBLE, maxValue); |
| } |
| |
| @Override |
| void putN(final long n) { |
| assert (mem_.getCapacity() >= COMBINED_BUFFER); |
| mem_.putLong(N_LONG, n); |
| } |
| |
| @Override |
| void putCombinedBuffer(final double[] combinedBuffer) { |
| mem_.putDoubleArray(COMBINED_BUFFER, combinedBuffer, 0, combinedBuffer.length); |
| } |
| |
| @Override |
| void putBaseBufferCount(final int baseBufferCount) { |
| //intentionally a no-op, not kept on-heap, always derived. |
| } |
| |
| @Override |
| void putBitPattern(final long bitPattern) { |
| //intentionally a no-op, not kept on-heap, always derived. |
| } |
| |
| @Override |
| double[] growCombinedBuffer(final int curCombBufItemCap, final int itemSpaceNeeded) { |
| mem_ = growCombinedMemBuffer(itemSpaceNeeded); |
| // copy out any data that was there |
| final double[] newCombBuf = new double[itemSpaceNeeded]; |
| mem_.getDoubleArray(COMBINED_BUFFER, newCombBuf, 0, curCombBufItemCap); |
| return newCombBuf; |
| } |
| |
| //Direct supporting methods |
| |
| private WritableMemory growCombinedMemBuffer(final int itemSpaceNeeded) { |
| final long memBytes = mem_.getCapacity(); |
| final int needBytes = (itemSpaceNeeded << 3) + COMBINED_BUFFER; //+ preamble + min & max |
| assert needBytes > memBytes; |
| |
| memReqSvr = (memReqSvr == null) ? mem_.getMemoryRequestServer() : memReqSvr; |
| |
| final WritableMemory newMem = memReqSvr.request(needBytes); |
| |
| mem_.copyTo(0, newMem, 0, memBytes); |
| |
| memReqSvr.requestClose(mem_, newMem); |
| |
| return newMem; |
| } |
| } |