| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.transforms; |
| |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; |
| |
| import java.io.DataInputStream; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Objects; |
| import java.util.PriorityQueue; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.coders.BigEndianIntegerCoder; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.coders.CoderException; |
| import org.apache.beam.sdk.coders.CoderRegistry; |
| import org.apache.beam.sdk.coders.CustomCoder; |
| import org.apache.beam.sdk.coders.ListCoder; |
| import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn; |
| import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn.Accumulator; |
| import org.apache.beam.sdk.transforms.display.DisplayData; |
| import org.apache.beam.sdk.util.WeightedValue; |
| import org.apache.beam.sdk.util.common.ElementByteSizeObserver; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.UnmodifiableIterator; |
| |
| /** |
| * {@code PTransform}s for getting an idea of a {@code PCollection}'s data distribution using |
| * approximate {@code N}-tiles (e.g. quartiles, percentiles, etc.), either globally or per-key. |
| */ |
| public class ApproximateQuantiles { |
| private ApproximateQuantiles() { |
| // do not instantiate |
| } |
| |
| /** |
| * Returns a {@code PTransform} that takes a {@code PCollection<T>} and returns a {@code |
| * PCollection<List<T>>} whose single value is a {@code List} of the approximate {@code N}-tiles |
| * of the elements of the input {@code PCollection}. This gives an idea of the distribution of the |
| * input elements. |
| * |
| * <p>The computed {@code List} is of size {@code numQuantiles}, and contains the input elements' |
| * minimum value, {@code numQuantiles-2} intermediate values, and maximum value, in sorted order, |
| * using the given {@code Comparator} to order values. To compute traditional {@code N}-tiles, one |
| * should use {@code ApproximateQuantiles.globally(N+1, compareFn)}. |
| * |
| * <p>If there are fewer input elements than {@code numQuantiles}, then the result {@code List} |
| * will contain all the input elements, in sorted order. |
| * |
| * <p>The argument {@code Comparator} must be {@code Serializable}. |
| * |
| * <p>Example of use: |
| * |
| * <pre>{@code |
| * PCollection<String> pc = ...; |
| * PCollection<List<String>> quantiles = |
| * pc.apply(ApproximateQuantiles.globally(11, stringCompareFn)); |
| * }</pre> |
| * |
| * @param <T> the type of the elements in the input {@code PCollection} |
| * @param numQuantiles the number of elements in the resulting quantile values {@code List} |
| * @param compareFn the function to use to order the elements |
| */ |
| public static <T, ComparatorT extends Comparator<T> & Serializable> |
| PTransform<PCollection<T>, PCollection<List<T>>> globally( |
| int numQuantiles, ComparatorT compareFn) { |
| return Combine.globally(ApproximateQuantilesCombineFn.create(numQuantiles, compareFn)); |
| } |
| |
| /** |
| * Like {@link #globally(int, Comparator)}, but sorts using the elements' natural ordering. |
| * |
| * @param <T> the type of the elements in the input {@code PCollection} |
| * @param numQuantiles the number of elements in the resulting quantile values {@code List} |
| */ |
| public static <T extends Comparable<T>> PTransform<PCollection<T>, PCollection<List<T>>> globally( |
| int numQuantiles) { |
| return Combine.globally(ApproximateQuantilesCombineFn.<T>create(numQuantiles)); |
| } |
| |
| /** |
| * Returns a {@code PTransform} that takes a {@code PCollection<KV<K, V>>} and returns a {@code |
| * PCollection<KV<K, List<V>>>} that contains an output element mapping each distinct key in the |
| * input {@code PCollection} to a {@code List} of the approximate {@code N}-tiles of the values |
| * associated with that key in the input {@code PCollection}. This gives an idea of the |
| * distribution of the input values for each key. |
| * |
| * <p>Each of the computed {@code List}s is of size {@code numQuantiles}, and contains the input |
| * values' minimum value, {@code numQuantiles-2} intermediate values, and maximum value, in sorted |
| * order, using the given {@code Comparator} to order values. To compute traditional {@code |
| * N}-tiles, one should use {@code ApproximateQuantiles.perKey(compareFn, N+1)}. |
| * |
| * <p>If a key has fewer than {@code numQuantiles} values associated with it, then that key's |
| * output {@code List} will contain all the key's input values, in sorted order. |
| * |
| * <p>The argument {@code Comparator} must be {@code Serializable}. |
| * |
| * <p>Example of use: |
| * |
| * <pre>{@code |
| * PCollection<KV<Integer, String>> pc = ...; |
| * PCollection<KV<Integer, List<String>>> quantilesPerKey = |
| * pc.apply(ApproximateQuantiles.<Integer, String>perKey(stringCompareFn, 11)); |
| * }</pre> |
| * |
| * <p>See {@link Combine.PerKey} for how this affects timestamps and windowing. |
| * |
| * @param <K> the type of the keys in the input and output {@code PCollection}s |
| * @param <V> the type of the values in the input {@code PCollection} |
| * @param numQuantiles the number of elements in the resulting quantile values {@code List} |
| * @param compareFn the function to use to order the elements |
| */ |
| public static <K, V, ComparatorT extends Comparator<V> & Serializable> |
| PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>> perKey( |
| int numQuantiles, ComparatorT compareFn) { |
| return Combine.perKey(ApproximateQuantilesCombineFn.create(numQuantiles, compareFn)); |
| } |
| |
| /** |
| * Like {@link #perKey(int, Comparator)}, but sorts values using the their natural ordering. |
| * |
| * @param <K> the type of the keys in the input and output {@code PCollection}s |
| * @param <V> the type of the values in the input {@code PCollection} |
| * @param numQuantiles the number of elements in the resulting quantile values {@code List} |
| */ |
| public static <K, V extends Comparable<V>> |
| PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>> perKey(int numQuantiles) { |
| return Combine.perKey(ApproximateQuantilesCombineFn.<V>create(numQuantiles)); |
| } |
| |
| ///////////////////////////////////////////////////////////////////////////// |
| |
| /** |
| * The {@code ApproximateQuantilesCombineFn} combiner gives an idea of the distribution of a |
| * collection of values using approximate {@code N}-tiles. The output of this combiner is a {@code |
| * List} of size {@code numQuantiles}, containing the input values' minimum value, {@code |
| * numQuantiles-2} intermediate values, and maximum value, in sorted order, so for traditional |
| * {@code N}-tiles, one should use {@code ApproximateQuantilesCombineFn#create(N+1)}. |
| * |
| * <p>If there are fewer values to combine than {@code numQuantiles}, then the result {@code List} |
| * will contain all the values being combined, in sorted order. |
| * |
| * <p>Values are ordered using either a specified {@code Comparator} or the values' natural |
| * ordering. |
| * |
| * <p>To evaluate the quantiles we use the "New Algorithm" described here: |
| * |
| * <pre> |
| * [MRL98] Manku, Rajagopalan & Lindsay, "Approximate Medians and other |
| * Quantiles in One Pass and with Limited Memory", Proc. 1998 ACM |
| * SIGMOD, Vol 27, No 2, p 426-435, June 1998. |
| * http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.6.6513&rep=rep1&type=pdf |
| * </pre> |
| * |
| * <p>The default error bound is {@code 1 / N}, though in practice the accuracy tends to be much |
| * better. |
| * |
| * <p>See {@link #create(int, Comparator, long, double)} for more information about the meaning of |
| * {@code epsilon}, and {@link #withEpsilon} for a convenient way to adjust it. |
| * |
| * @param <T> the type of the values being combined |
| */ |
| public static class ApproximateQuantilesCombineFn< |
| T, ComparatorT extends Comparator<T> & Serializable> |
| extends AccumulatingCombineFn<T, QuantileState<T, ComparatorT>, List<T>> { |
| |
| /** |
| * The cost (in time and space) to compute quantiles to a given accuracy is a function of the |
| * total number of elements in the data set. If an estimate is not known or specified, we use |
| * this as an upper bound. If this is too low, errors may exceed the requested tolerance; if too |
| * high, efficiency may be non-optimal. The impact is logarithmic with respect to this value, so |
| * this default should be fine for most uses. |
| */ |
| public static final long DEFAULT_MAX_NUM_ELEMENTS = (long) 1e9; |
| |
| /** The comparison function to use. */ |
| private final ComparatorT compareFn; |
| |
| /** |
| * Number of quantiles to produce. The size of the final output list, including the minimum and |
| * maximum, is numQuantiles. |
| */ |
| private final int numQuantiles; |
| |
| /** The size of the buffers, corresponding to k in the referenced paper. */ |
| private final int bufferSize; |
| |
| /** The number of buffers, corresponding to b in the referenced paper. */ |
| private final int numBuffers; |
| |
| private final long maxNumElements; |
| |
| private ApproximateQuantilesCombineFn( |
| int numQuantiles, |
| ComparatorT compareFn, |
| int bufferSize, |
| int numBuffers, |
| long maxNumElements) { |
| checkArgument(numQuantiles >= 2); |
| checkArgument(bufferSize >= 2); |
| checkArgument(numBuffers >= 2); |
| this.numQuantiles = numQuantiles; |
| this.compareFn = compareFn; |
| this.bufferSize = bufferSize; |
| this.numBuffers = numBuffers; |
| this.maxNumElements = maxNumElements; |
| } |
| |
| /** |
| * Returns an approximate quantiles combiner with the given {@code compareFn} and desired number |
| * of quantiles. A total of {@code numQuantiles} elements will appear in the output list, |
| * including the minimum and maximum. |
| * |
| * <p>The {@code Comparator} must be {@code Serializable}. |
| * |
| * <p>The default error bound is {@code 1 / numQuantiles}, which holds as long as the number of |
| * elements is less than {@link #DEFAULT_MAX_NUM_ELEMENTS}. |
| */ |
| public static <T, ComparatorT extends Comparator<T> & Serializable> |
| ApproximateQuantilesCombineFn<T, ComparatorT> create( |
| int numQuantiles, ComparatorT compareFn) { |
| return create(numQuantiles, compareFn, DEFAULT_MAX_NUM_ELEMENTS, 1.0 / numQuantiles); |
| } |
| |
| /** Like {@link #create(int, Comparator)}, but sorts values using their natural ordering. */ |
| public static <T extends Comparable<T>> ApproximateQuantilesCombineFn<T, Top.Natural<T>> create( |
| int numQuantiles) { |
| return create(numQuantiles, new Top.Natural<T>()); |
| } |
| |
| /** |
| * Returns an {@code ApproximateQuantilesCombineFn} that's like this one except that it uses the |
| * specified {@code epsilon} value. Does not modify this combiner. |
| * |
| * <p>See {@link #create(int, Comparator, long, double)} for more information about the meaning |
| * of {@code epsilon}. |
| */ |
| public ApproximateQuantilesCombineFn<T, ComparatorT> withEpsilon(double epsilon) { |
| return create(numQuantiles, compareFn, maxNumElements, epsilon); |
| } |
| |
| /** |
| * Returns an {@code ApproximateQuantilesCombineFn} that's like this one except that it uses the |
| * specified {@code maxNumElements} value. Does not modify this combiner. |
| * |
| * <p>See {@link #create(int, Comparator, long, double)} for more information about the meaning |
| * of {@code maxNumElements}. |
| */ |
| public ApproximateQuantilesCombineFn<T, ComparatorT> withMaxInputSize(long maxNumElements) { |
| return create(numQuantiles, compareFn, maxNumElements, maxNumElements); |
| } |
| |
| /** |
| * Creates an approximate quantiles combiner with the given {@code compareFn} and desired number |
| * of quantiles. A total of {@code numQuantiles} elements will appear in the output list, |
| * including the minimum and maximum. |
| * |
| * <p>The {@code Comparator} must be {@code Serializable}. |
| * |
| * <p>The default error bound is {@code epsilon}, which holds as long as the number of elements |
| * is less than {@code maxNumElements}. Specifically, if one considers the input as a sorted |
| * list x_1, ..., x_N, then the distance between the each exact quantile x_c and its |
| * approximation x_c' is bounded by {@code |c - c'| < epsilon * N}. Note that these errors are |
| * worst-case scenarios; in practice the accuracy tends to be much better. |
| */ |
| public static <T, ComparatorT extends Comparator<T> & Serializable> |
| ApproximateQuantilesCombineFn<T, ComparatorT> create( |
| int numQuantiles, ComparatorT compareFn, long maxNumElements, double epsilon) { |
| // Compute optimal b and k. |
| int b = 2; |
| while ((b - 2) * (1 << (b - 2)) < epsilon * maxNumElements) { |
| b++; |
| } |
| b--; |
| int k = Math.max(2, (int) Math.ceil(maxNumElements / (float) (1 << (b - 1)))); |
| return new ApproximateQuantilesCombineFn<>(numQuantiles, compareFn, k, b, maxNumElements); |
| } |
| |
| @Override |
| public QuantileState<T, ComparatorT> createAccumulator() { |
| return QuantileState.empty(compareFn, numQuantiles, numBuffers, bufferSize); |
| } |
| |
| @Override |
| public Coder<QuantileState<T, ComparatorT>> getAccumulatorCoder( |
| CoderRegistry registry, Coder<T> elementCoder) { |
| return new QuantileStateCoder<>(compareFn, elementCoder); |
| } |
| |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| super.populateDisplayData(builder); |
| builder |
| .add(DisplayData.item("numQuantiles", numQuantiles).withLabel("Quantile Count")) |
| .add(DisplayData.item("comparer", compareFn.getClass()).withLabel("Record Comparer")); |
| } |
| |
| int getNumBuffers() { |
| return numBuffers; |
| } |
| |
| int getBufferSize() { |
| return bufferSize; |
| } |
| } |
| |
| /** Compact summarization of a collection on which quantiles can be estimated. */ |
| static class QuantileState<T, ComparatorT extends Comparator<T> & Serializable> |
| implements Accumulator<T, QuantileState<T, ComparatorT>, List<T>> { |
| |
| private ComparatorT compareFn; |
| private int numQuantiles; |
| private int numBuffers; |
| private int bufferSize; |
| |
| @Nullable private T min; |
| |
| @Nullable private T max; |
| |
| /** The set of buffers, ordered by level from smallest to largest. */ |
| private PriorityQueue<QuantileBuffer<T>> buffers; |
| |
| /** |
| * The algorithm requires that the manipulated buffers always be filled to capacity to perform |
| * the collapse operation. This operation can be extended to buffers of varying sizes by |
| * introducing the notion of fractional weights, but it's easier to simply combine the |
| * remainders from all shards into new, full buffers and then take them into account when |
| * computing the final output. |
| */ |
| private List<T> unbufferedElements = Lists.newArrayList(); |
| |
| private QuantileState( |
| ComparatorT compareFn, |
| int numQuantiles, |
| @Nullable T min, |
| @Nullable T max, |
| int numBuffers, |
| int bufferSize, |
| Collection<T> unbufferedElements, |
| Collection<QuantileBuffer<T>> buffers) { |
| this.compareFn = compareFn; |
| this.numQuantiles = numQuantiles; |
| this.numBuffers = numBuffers; |
| this.bufferSize = bufferSize; |
| this.buffers = |
| new PriorityQueue<>(numBuffers + 1, (q1, q2) -> Integer.compare(q1.level, q2.level)); |
| this.min = min; |
| this.max = max; |
| this.unbufferedElements.addAll(unbufferedElements); |
| this.buffers.addAll(buffers); |
| } |
| |
| public static <T, ComparatorT extends Comparator<T> & Serializable> |
| QuantileState<T, ComparatorT> empty( |
| ComparatorT compareFn, int numQuantiles, int numBuffers, int bufferSize) { |
| return new QuantileState<>( |
| compareFn, |
| numQuantiles, |
| null, /* min */ |
| null, /* max */ |
| numBuffers, |
| bufferSize, |
| Collections.emptyList(), |
| Collections.emptyList()); |
| } |
| |
| public static <T, ComparatorT extends Comparator<T> & Serializable> |
| QuantileState<T, ComparatorT> singleton( |
| ComparatorT compareFn, int numQuantiles, T elem, int numBuffers, int bufferSize) { |
| return new QuantileState<>( |
| compareFn, |
| numQuantiles, |
| elem, /* min */ |
| elem, /* max */ |
| numBuffers, |
| bufferSize, |
| Collections.singletonList(elem), |
| Collections.emptyList()); |
| } |
| |
| /** Add a new element to the collection being summarized by this state. */ |
| @Override |
| public void addInput(T elem) { |
| if (isEmpty()) { |
| min = max = elem; |
| } else if (compareFn.compare(elem, min) < 0) { |
| min = elem; |
| } else if (compareFn.compare(elem, max) > 0) { |
| max = elem; |
| } |
| addUnbuffered(elem); |
| } |
| |
| /** Add a new buffer to the unbuffered list, creating a new buffer and collapsing if needed. */ |
| private void addUnbuffered(T elem) { |
| unbufferedElements.add(elem); |
| if (unbufferedElements.size() == bufferSize) { |
| unbufferedElements.sort(compareFn); |
| buffers.add(new QuantileBuffer<>(unbufferedElements)); |
| unbufferedElements = Lists.newArrayListWithCapacity(bufferSize); |
| collapseIfNeeded(); |
| } |
| } |
| |
| /** |
| * Updates this as if adding all elements seen by other. |
| * |
| * <p>Note that this ignores the {@code Comparator} of the other {@link QuantileState}. In |
| * practice, they should generally be equal, but this method tolerates a mismatch. |
| */ |
| @Override |
| public void mergeAccumulator(QuantileState<T, ComparatorT> other) { |
| if (other.isEmpty()) { |
| return; |
| } |
| if (min == null || compareFn.compare(other.min, min) < 0) { |
| min = other.min; |
| } |
| if (max == null || compareFn.compare(other.max, max) > 0) { |
| max = other.max; |
| } |
| for (T elem : other.unbufferedElements) { |
| addUnbuffered(elem); |
| } |
| buffers.addAll(other.buffers); |
| collapseIfNeeded(); |
| } |
| |
| public boolean isEmpty() { |
| return unbufferedElements.isEmpty() && buffers.isEmpty(); |
| } |
| |
| private void collapseIfNeeded() { |
| while (buffers.size() > numBuffers) { |
| List<QuantileBuffer<T>> toCollapse = Lists.newArrayList(); |
| toCollapse.add(buffers.poll()); |
| toCollapse.add(buffers.poll()); |
| int minLevel = toCollapse.get(1).level; |
| while (!buffers.isEmpty() && buffers.peek().level == minLevel) { |
| toCollapse.add(buffers.poll()); |
| } |
| buffers.add(collapse(toCollapse)); |
| } |
| } |
| |
| private QuantileBuffer<T> collapse(Iterable<QuantileBuffer<T>> buffers) { |
| int newLevel = 0; |
| long newWeight = 0; |
| for (QuantileBuffer<T> buffer : buffers) { |
| // As presented in the paper, there should always be at least two |
| // buffers of the same (minimal) level to collapse, but it is possible |
| // to violate this condition when combining buffers from independently |
| // computed shards. If they differ we take the max. |
| newLevel = Math.max(newLevel, buffer.level + 1); |
| newWeight += buffer.weight; |
| } |
| List<T> newElements = interpolate(buffers, bufferSize, newWeight, offset(newWeight)); |
| return new QuantileBuffer<>(newLevel, newWeight, newElements); |
| } |
| |
| /** |
| * If the weight is even, we must round up or down. Alternate between these two options to avoid |
| * a bias. |
| */ |
| private long offset(long newWeight) { |
| if (newWeight % 2 == 1) { |
| return (newWeight + 1) / 2; |
| } else { |
| offsetJitter = 2 - offsetJitter; |
| return (newWeight + offsetJitter) / 2; |
| } |
| } |
| |
| /** For alternating between biasing up and down in the above even weight collapse operation. */ |
| private int offsetJitter = 0; |
| |
| /** |
| * Emulates taking the ordered union of all elements in buffers, repeated according to their |
| * weight, and picking out the (k * step + offset)-th elements of this list for {@code 0 <= k |
| * < count}. |
| */ |
| private List<T> interpolate( |
| Iterable<QuantileBuffer<T>> buffers, int count, double step, double offset) { |
| List<Iterator<WeightedValue<T>>> iterators = Lists.newArrayList(); |
| for (QuantileBuffer<T> buffer : buffers) { |
| iterators.add(buffer.sizedIterator()); |
| } |
| // Each of the buffers is already sorted by element. |
| Iterator<WeightedValue<T>> sorted = |
| Iterators.mergeSorted(iterators, (a, b) -> compareFn.compare(a.getValue(), b.getValue())); |
| |
| List<T> newElements = Lists.newArrayListWithCapacity(count); |
| WeightedValue<T> weightedElement = sorted.next(); |
| double current = weightedElement.getWeight(); |
| for (int j = 0; j < count; j++) { |
| double target = j * step + offset; |
| while (current <= target && sorted.hasNext()) { |
| weightedElement = sorted.next(); |
| current += weightedElement.getWeight(); |
| } |
| newElements.add(weightedElement.getValue()); |
| } |
| return newElements; |
| } |
| |
| /** |
| * Outputs numQuantiles elements consisting of the minimum, maximum, and numQuantiles - 2 evenly |
| * spaced intermediate elements. |
| * |
| * <p>Returns the empty list if no elements have been added. |
| */ |
| @Override |
| public List<T> extractOutput() { |
| if (isEmpty()) { |
| return Lists.newArrayList(); |
| } |
| long totalCount = unbufferedElements.size(); |
| for (QuantileBuffer<T> buffer : buffers) { |
| totalCount += bufferSize * buffer.weight; |
| } |
| List<QuantileBuffer<T>> all = Lists.newArrayList(buffers); |
| if (!unbufferedElements.isEmpty()) { |
| unbufferedElements.sort(compareFn); |
| all.add(new QuantileBuffer<>(unbufferedElements)); |
| } |
| double step = 1.0 * totalCount / (numQuantiles - 1); |
| double offset = (1.0 * totalCount - 1) / (numQuantiles - 1); |
| List<T> quantiles = interpolate(all, numQuantiles - 2, step, offset); |
| quantiles.add(0, min); |
| quantiles.add(max); |
| return quantiles; |
| } |
| } |
| |
| /** A single buffer in the sense of the referenced algorithm. */ |
| private static class QuantileBuffer<T> { |
| private int level; |
| private long weight; |
| private List<T> elements; |
| |
| public QuantileBuffer(List<T> elements) { |
| this(0, 1, elements); |
| } |
| |
| public QuantileBuffer(int level, long weight, List<T> elements) { |
| this.level = level; |
| this.weight = weight; |
| this.elements = elements; |
| } |
| |
| @Override |
| public String toString() { |
| return "QuantileBuffer[" |
| + "level=" |
| + level |
| + ", weight=" |
| + weight |
| + ", elements=" |
| + elements |
| + "]"; |
| } |
| |
| public Iterator<WeightedValue<T>> sizedIterator() { |
| return new UnmodifiableIterator<WeightedValue<T>>() { |
| Iterator<T> iter = elements.iterator(); |
| |
| @Override |
| public boolean hasNext() { |
| return iter.hasNext(); |
| } |
| |
| @Override |
| public WeightedValue<T> next() { |
| return WeightedValue.of(iter.next(), weight); |
| } |
| }; |
| } |
| } |
| |
| /** Coder for QuantileState. */ |
| private static class QuantileStateCoder<T, ComparatorT extends Comparator<T> & Serializable> |
| extends CustomCoder<QuantileState<T, ComparatorT>> { |
| private final ComparatorT compareFn; |
| private final Coder<T> elementCoder; |
| private final Coder<List<T>> elementListCoder; |
| private final Coder<Integer> intCoder = BigEndianIntegerCoder.of(); |
| |
| public QuantileStateCoder(ComparatorT compareFn, Coder<T> elementCoder) { |
| this.compareFn = compareFn; |
| this.elementCoder = elementCoder; |
| this.elementListCoder = ListCoder.of(elementCoder); |
| } |
| |
| @Override |
| public void encode(QuantileState<T, ComparatorT> state, OutputStream outStream) |
| throws CoderException, IOException { |
| intCoder.encode(state.numQuantiles, outStream); |
| intCoder.encode(state.bufferSize, outStream); |
| elementCoder.encode(state.min, outStream); |
| elementCoder.encode(state.max, outStream); |
| elementListCoder.encode(state.unbufferedElements, outStream); |
| BigEndianIntegerCoder.of().encode(state.buffers.size(), outStream); |
| for (QuantileBuffer<T> buffer : state.buffers) { |
| encodeBuffer(buffer, outStream); |
| } |
| } |
| |
| @Override |
| public QuantileState<T, ComparatorT> decode(InputStream inStream) |
| throws CoderException, IOException { |
| int numQuantiles = intCoder.decode(inStream); |
| int bufferSize = intCoder.decode(inStream); |
| T min = elementCoder.decode(inStream); |
| T max = elementCoder.decode(inStream); |
| List<T> unbufferedElements = elementListCoder.decode(inStream); |
| int numBuffers = BigEndianIntegerCoder.of().decode(inStream); |
| List<QuantileBuffer<T>> buffers = new ArrayList<>(numBuffers); |
| for (int i = 0; i < numBuffers; i++) { |
| buffers.add(decodeBuffer(inStream)); |
| } |
| return new QuantileState<>( |
| compareFn, numQuantiles, min, max, numBuffers, bufferSize, unbufferedElements, buffers); |
| } |
| |
| private void encodeBuffer(QuantileBuffer<T> buffer, OutputStream outStream) |
| throws CoderException, IOException { |
| DataOutputStream outData = new DataOutputStream(outStream); |
| outData.writeInt(buffer.level); |
| outData.writeLong(buffer.weight); |
| elementListCoder.encode(buffer.elements, outStream); |
| } |
| |
| private QuantileBuffer<T> decodeBuffer(InputStream inStream) |
| throws IOException, CoderException { |
| DataInputStream inData = new DataInputStream(inStream); |
| return new QuantileBuffer<>( |
| inData.readInt(), inData.readLong(), elementListCoder.decode(inStream)); |
| } |
| |
| /** |
| * Notifies ElementByteSizeObserver about the byte size of the encoded value using this coder. |
| */ |
| @Override |
| public void registerByteSizeObserver( |
| QuantileState<T, ComparatorT> state, ElementByteSizeObserver observer) throws Exception { |
| elementCoder.registerByteSizeObserver(state.min, observer); |
| elementCoder.registerByteSizeObserver(state.max, observer); |
| elementListCoder.registerByteSizeObserver(state.unbufferedElements, observer); |
| |
| BigEndianIntegerCoder.of().registerByteSizeObserver(state.buffers.size(), observer); |
| for (QuantileBuffer<T> buffer : state.buffers) { |
| observer.update(4L + 8); |
| elementListCoder.registerByteSizeObserver(buffer.elements, observer); |
| } |
| } |
| |
| @Override |
| public boolean equals(Object other) { |
| if (other == this) { |
| return true; |
| } |
| if (!(other instanceof QuantileStateCoder)) { |
| return false; |
| } |
| QuantileStateCoder<?, ?> that = (QuantileStateCoder<?, ?>) other; |
| return Objects.equals(this.elementCoder, that.elementCoder) |
| && Objects.equals(this.compareFn, that.compareFn); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(elementCoder, compareFn); |
| } |
| |
| @Override |
| public void verifyDeterministic() throws NonDeterministicException { |
| verifyDeterministic(this, "QuantileState.ElementCoder must be deterministic", elementCoder); |
| verifyDeterministic( |
| this, "QuantileState.ElementListCoder must be deterministic", elementListCoder); |
| } |
| } |
| } |