| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.datasketches.sampling; |
| |
| import static org.apache.datasketches.common.Util.LS; |
| |
| import java.lang.reflect.Array; |
| import java.util.ArrayList; |
| import java.util.Random; |
| import java.util.concurrent.ThreadLocalRandom; |
| |
| import org.apache.datasketches.common.SketchesArgumentException; |
| |
| // this is a supporting class used to hold the raw data sample |
| final class EbppsItemsSample<T> { |
| |
| private double c_; // Current sample size, including fractional part |
| private T partialItem_; // a sample item corresponding to a partial weight |
| private ArrayList<T> data_; // full sample items |
| |
| private Random rand_; // ThreadLocalRandom.current() in general |
| |
| // basic constructor |
| EbppsItemsSample(final int reservedSize) { |
| c_ = 0.0; |
| data_ = new ArrayList<>(reservedSize); |
| rand_ = ThreadLocalRandom.current(); |
| } |
| |
| // copy constructor used during merge |
| EbppsItemsSample(final EbppsItemsSample<T> other) { |
| c_ = other.c_; |
| partialItem_ = other.partialItem_; |
| data_ = new ArrayList<>(other.data_); |
| rand_ = other.rand_; |
| } |
| |
| // constructor used for deserialization and testing |
| // does NOT copy the incoming ArrayList since this is an internal |
| // class's package-private constructor, not something directly |
| // taking user data |
| EbppsItemsSample(final ArrayList<T> data, final T partialItem, final double c) { |
| if (c < 0.0 || Double.isNaN(c) || Double.isInfinite(c)) { |
| throw new SketchesArgumentException("C must be nonnegative and finite. Found: " + c); |
| } |
| |
| c_ = c; |
| partialItem_ = partialItem; |
| data_ = data; |
| rand_ = ThreadLocalRandom.current(); |
| } |
| |
| // Used in lieu of a constructor to populate a temporary sample |
| // with data before immediately merging it. This approach |
| // avoids excessive object allocation calls. |
| // rand_ is not set since it is not expected to be used from |
| // this object |
| void replaceContent(final T item, final double theta) { |
| if (theta < 0.0 || theta > 1.0 || Double.isNaN(theta)) { |
| throw new SketchesArgumentException("Theta must be in the range [0.0, 1.0]. Found: " + theta); |
| } |
| |
| c_ = theta; |
| if (theta == 1.0) { |
| if (data_ != null && data_.size() == 1) { |
| data_.set(0, item); |
| } else { |
| data_ = new ArrayList<>(1); |
| data_.add(item); |
| } |
| partialItem_ = null; |
| } else { |
| data_ = null; |
| partialItem_ = item; |
| } |
| } |
| |
| void reset() { |
| c_ = 0.0; |
| partialItem_ = null; |
| data_.clear(); |
| } |
| |
| ArrayList<T> getSample() { |
| final double cFrac = c_ % 1; |
| final boolean includePartial = partialItem_ != null && rand_.nextDouble() < cFrac; |
| final int resultSize = (data_ != null ? data_.size() : 0) + (includePartial ? 1 : 0); |
| |
| if (resultSize == 0) { |
| return null; |
| } |
| |
| final ArrayList<T> result = new ArrayList<>(resultSize); |
| if (data_ != null) { |
| result.addAll(data_); |
| } |
| |
| if (includePartial) { |
| result.add(partialItem_); |
| } |
| |
| return result; |
| } |
| |
| @SuppressWarnings("unchecked") |
| T[] getAllSamples(final Class<?> clazz) { |
| // Is it faster to use sublist and append 1? |
| final T[] itemsArray = (T[]) Array.newInstance(clazz, getNumRetainedItems()); |
| int i = 0; |
| if (data_ != null) { |
| for (T item : data_) { |
| if (item != null) { |
| itemsArray[i++] = item; |
| } |
| } |
| } |
| if (partialItem_ != null) { |
| itemsArray[i] = partialItem_; // no need to increment i again |
| } |
| |
| return itemsArray; |
| } |
| |
| // package-private for use in merge and serialization |
| ArrayList<T> getFullItems() { |
| return data_; |
| } |
| |
| // package-private for use in merge and serialization |
| T getPartialItem() { |
| return partialItem_; |
| } |
| |
| double getC() { return c_; } |
| |
| boolean hasPartialItem() { return partialItem_ != null; } |
| |
| // for testing to allow setting the seed |
| void replaceRandom(final Random r) { |
| rand_ = r; |
| } |
| |
| void downsample(final double theta) { |
| if (theta >= 1.0) { return; } |
| |
| final double newC = theta * c_; |
| final double newCInt = Math.floor(newC); |
| final double newCFrac = newC % 1; |
| final double cInt = Math.floor(c_); |
| final double cFrac = c_ % 1; |
| |
| if (newCInt == 0.0) { |
| // no full items retained |
| if (rand_.nextDouble() > (cFrac / c_)) { |
| swapWithPartialItem(); |
| } |
| data_.clear(); |
| } else if (newCInt == cInt) { |
| // no items deleted |
| if (rand_.nextDouble() > (1 - theta * cFrac) / (1 - newCFrac)) { |
| swapWithPartialItem(); |
| } |
| } else { |
| if (rand_.nextDouble() < theta * cFrac) { |
| // subsample data in random order; last item is partial |
| // create sample size newC then swapWithPartialItem() |
| subsample((int) newCInt); |
| swapWithPartialItem(); |
| } else { |
| // create sample size newCInt + 1 then moveOneToPartialItem() |
| subsample((int) newCInt + 1); |
| moveOneToPartialItem(); |
| } |
| } |
| |
| if (newC == newCInt) { |
| partialItem_ = null; |
| } |
| |
| c_ = newC; |
| } |
| |
| void merge(final EbppsItemsSample<T> other) { |
| //double cInt = Math.floor(c_); |
| final double cFrac = c_ % 1; |
| final double otherCFrac = other.c_ % 1; |
| |
| // update c_ here but do NOT recompute fractional part yet |
| c_ += other.c_; |
| |
| if (other.data_ != null) { |
| data_.addAll(other.data_); |
| } |
| |
| // This modifies the original algorithm slightly due to numeric |
| // precision issues. Specifically, the test if cFrac + otherCFrac == 1.0 |
| // happens before tests for < 1.0 or > 1.0 and can also be triggered |
| // if c_ == floor(c_) (the updated value of c_, not the input). |
| // |
| // We can still run into issues where cFrac + otherCFrac == epsilon |
| // and the first case would have ideally triggered. As a result, we must |
| // check if the partial item exists before adding to the data_ vector. |
| |
| if (cFrac == 0.0 && otherCFrac == 0.0) { |
| partialItem_ = null; |
| } else if (cFrac + otherCFrac == 1.0 || c_ == Math.floor(c_)) { |
| if (rand_.nextDouble() <= cFrac) { |
| if (partialItem_ != null) { |
| data_.add(partialItem_); |
| } |
| } else { |
| if (other.partialItem_ != null) { |
| data_.add(other.partialItem_); |
| } |
| } |
| partialItem_ = null; |
| } else if (cFrac + otherCFrac < 1.0) { |
| if (rand_.nextDouble() > cFrac / (cFrac + otherCFrac)) { |
| partialItem_ = other.partialItem_; |
| } |
| } else { // cFrac + otherCFrac > 1 |
| if (rand_.nextDouble() <= (1 - cFrac) / ((1 - cFrac) + (1 - otherCFrac))) { |
| data_.add(other.partialItem_); |
| } else { |
| data_.add(partialItem_); |
| partialItem_ = other.partialItem_; |
| } |
| } |
| } |
| |
| @Override |
| public String toString() { |
| final StringBuilder sb = new StringBuilder(); |
| |
| sb.append(" sample:").append(LS); |
| int idx = 0; |
| for (T item : data_) { |
| sb.append("\t").append(idx++).append(":\t").append(item.toString()).append(LS); |
| } |
| sb.append(" partial: "); |
| if (partialItem_ != null) { |
| sb.append(partialItem_).append(LS); |
| } else { |
| sb.append("NULL").append(LS); |
| } |
| |
| return sb.toString(); |
| } |
| |
| void subsample(final int numSamples) { |
| // we can perform a Fisher-Yates style shuffle, stopping after |
| // numSamples points since subsequent swaps would only be |
| // between items after num_samples. This is valid since a |
| // point from anywhere in the initial array would be eligible |
| // to end up in the final subsample. |
| |
| if (numSamples == data_.size()) { return; } |
| |
| final int dataLen = data_.size(); |
| for (int i = 0; i < numSamples; ++i) { |
| final int j = i + rand_.nextInt(dataLen - i); |
| // swap i and j |
| final T tmp = data_.get(i); |
| data_.set(i, data_.get(j)); |
| data_.set(j, tmp); |
| } |
| |
| // clear anything beyond numSamples |
| data_.subList(numSamples, data_.size()).clear(); |
| } |
| |
| void swapWithPartialItem() { |
| if (partialItem_ == null) { |
| moveOneToPartialItem(); |
| } else { |
| final int idx = rand_.nextInt(data_.size()); |
| final T tmp = partialItem_; |
| partialItem_ = data_.get(idx); |
| data_.set(idx, tmp); |
| } |
| } |
| |
| void moveOneToPartialItem() { |
| final int idx = rand_.nextInt(data_.size()); |
| // swap selected item to end so we can delete it easily |
| final int lastIdx = data_.size() - 1; |
| if (idx != lastIdx) { |
| final T tmp = data_.get(idx); |
| data_.set(idx, data_.get(lastIdx)); |
| partialItem_ = tmp; |
| } else { |
| partialItem_ = data_.get(lastIdx); |
| } |
| |
| data_.remove(lastIdx); |
| } |
| |
| int getNumRetainedItems() { |
| return (data_ != null ? data_.size() : 0) |
| + (partialItem_ != null ? 1 : 0); |
| } |
| } |