| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.dataflow.worker.util; |
| |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.io.Serializable; |
| import java.math.RoundingMode; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Objects; |
| import org.apache.beam.sdk.coders.AtomicCoder; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.coders.CoderException; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.BloomFilter; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Funnel; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.PrimitiveSink; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.DoubleMath; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath; |
| |
| /** |
| * A Bloom filter implementation that maintains an expected false probability of {@code 0.000001} |
| * while growing with the number of inserations up to a specified size or insertion limit. Once the |
| * limit is reached, the expected false positive probability can no longer be honored. This limit |
| * prevents the Bloom filter from growing too large when there are many insertions. The Bloom filter |
| * that is built is the smaller of twice the number of insertions or the user specified limit. |
| */ |
| public class ScalableBloomFilter implements Serializable { |
| /** |
| * A {@link Coder} for scalable Bloom filters. The encoded format is (see {@link |
| * BloomFilter#writeTo(OutputStream)}): |
| * |
| * <ul> |
| * <li>1 signed byte for the strategy |
| * <li>1 unsigned byte for the number of hash functions |
| * <li>1 big endian int, the number of longs in our bitset |
| * <li>N big endian longs of our bitset |
| * </ul> |
| */ |
| public static class ScalableBloomFilterCoder extends AtomicCoder<ScalableBloomFilter> { |
| private static final ScalableBloomFilterCoder INSTANCE = new ScalableBloomFilterCoder(); |
| |
| public static ScalableBloomFilterCoder of() { |
| return INSTANCE; |
| } |
| |
| @Override |
| public void encode(ScalableBloomFilter value, OutputStream outStream) |
| throws CoderException, IOException { |
| value.bloomFilter.writeTo(outStream); |
| } |
| |
| @Override |
| public ScalableBloomFilter decode(InputStream inStream) throws CoderException, IOException { |
| return new ScalableBloomFilter(BloomFilter.readFrom(inStream, ByteBufferFunnel.INSTANCE)); |
| } |
| |
| @Override |
| public void verifyDeterministic() {} |
| |
| @Override |
| public boolean consistentWithEquals() { |
| return true; |
| } |
| } |
| |
| private static final long MAX_ELEMENTS = 1L << 62; |
| private static final double DEFAULT_FALSE_POSITIVE_PROBABILITY = 0.000001; |
| |
| private final BloomFilter<ByteBuffer> bloomFilter; |
| |
| private ScalableBloomFilter(BloomFilter<ByteBuffer> bloomFilter) { |
| this.bloomFilter = bloomFilter; |
| } |
| |
| /** |
| * Returns false if the Bloom filter definitely does not contain the byte representation of an |
| * element contained in {@code buf} from {@code [offset, offset + length)}. |
| */ |
| public boolean mightContain(byte[] buf, int offset, int length) { |
| ByteBuffer byteBuffer = ByteBuffer.wrap(buf, offset, length); |
| return mightContain(byteBuffer); |
| } |
| |
| /** |
| * Returns false if the Bloom filter definitely does not contain the byte representation of an |
| * element contained in {@code byteBuffer} from {@code [position, limit]}. |
| */ |
| public boolean mightContain(ByteBuffer byteBuffer) { |
| return bloomFilter.mightContain(byteBuffer); |
| } |
| |
| @Override |
| public boolean equals(Object other) { |
| if (other == this) { |
| return true; |
| } |
| if (!(other instanceof ScalableBloomFilter)) { |
| return false; |
| } |
| ScalableBloomFilter scalableBloomFilter = (ScalableBloomFilter) other; |
| return Objects.equals(bloomFilter, scalableBloomFilter.bloomFilter); |
| } |
| |
| @Override |
| public int hashCode() { |
| return bloomFilter.hashCode(); |
| } |
| |
| @Override |
| public String toString() { |
| return MoreObjects.toStringHelper(ScalableBloomFilter.class) |
| .add("bloomFilter", bloomFilter) |
| .toString(); |
| } |
| |
| /** |
| * Returns a scalable Bloom filter builder allowing one to construct a Bloom filter with the |
| * specified size limit. Each insertion may grow the Bloom filter up to the specified size limit. |
| * While within the size limit, the expected false positive probability is {@code 0.000001}. Once |
| * the size limit is surpassed, the Bloom filter will no longer honor the expected false positive |
| * probability of {@code 0.000001}. |
| */ |
| public static Builder withMaximumSizeBytes(long maxBloomFilterSizeBytes) { |
| checkArgument(maxBloomFilterSizeBytes > 0, "Expected Bloom filter size limit to be positive."); |
| long optimalNumberOfElements = |
| optimalNumInsertions(maxBloomFilterSizeBytes, DEFAULT_FALSE_POSITIVE_PROBABILITY); |
| checkArgument( |
| optimalNumberOfElements <= MAX_ELEMENTS, |
| "The specified size limit would attempt to create a Bloom filter builder larger than " |
| + "the maximum supported size of 2^63."); |
| |
| return withMaximumNumberOfInsertionsForOptimalBloomFilter(optimalNumberOfElements); |
| } |
| |
| /** |
| * Returns a scalable Bloom filter builder allowing one to construct a Bloom filter that will |
| * maintain the expected false positive probability of {@code 0.000001} as long as there have been |
| * fewer insertions then the specified limit. Once the limit is surpassed, the Bloom filter will |
| * no longer honor the expected false positive probability of {@code 0.000001}. |
| */ |
| public static Builder withMaximumNumberOfInsertionsForOptimalBloomFilter( |
| long maximumBloomFilterInsertionsForOptimalBloomFilter) { |
| checkArgument( |
| maximumBloomFilterInsertionsForOptimalBloomFilter > 0, |
| "Expected Bloom filter insertion limit to be positive."); |
| checkArgument( |
| maximumBloomFilterInsertionsForOptimalBloomFilter <= MAX_ELEMENTS, |
| "The specified size is larger than the maximum supported size of 2^63 elements."); |
| |
| return new Builder(maximumBloomFilterInsertionsForOptimalBloomFilter); |
| } |
| |
| /** Calculate the number of optimal insertions based upon the number of bytes. */ |
| private static long optimalNumInsertions(long bytes, double p) { |
| checkArgument(p > 0, "Expected false positive probability to be positive."); |
| return LongMath.checkedMultiply( |
| 8, |
| DoubleMath.roundToLong( |
| -bytes * Math.log(2) * Math.log(2) / Math.log(p), RoundingMode.DOWN)); |
| } |
| |
| /** |
| * A scalable Bloom filter builder which uses approximately double the amount of memory for the |
| * specified optimal number of insertions. The returned Bloom filter will scale with the number of |
| * insertions honoring the expected false positive probability rate of {@code 0.000001} until the |
| * limit is reached. The Bloom filter that is built is the smaller of twice the number of |
| * insertions or the user specified limit. |
| */ |
| public static class Builder { |
| private final List<BloomFilter<ByteBuffer>> bloomFilters; |
| private long numberOfInsertions; |
| |
| private Builder(long maximumBloomFilterInsertions) { |
| this.bloomFilters = new ArrayList<>(); |
| |
| // 1, 2, 4, 8, 16, 32, ... |
| for (long i = 1; i < maximumBloomFilterInsertions; i = i << 1) { |
| bloomFilters.add( |
| BloomFilter.<ByteBuffer>create( |
| ByteBufferFunnel.INSTANCE, i, DEFAULT_FALSE_POSITIVE_PROBABILITY)); |
| } |
| |
| // Add the largest Bloom filter we will scale to based off of what the user requested. |
| bloomFilters.add( |
| BloomFilter.<ByteBuffer>create( |
| ByteBufferFunnel.INSTANCE, |
| maximumBloomFilterInsertions, |
| DEFAULT_FALSE_POSITIVE_PROBABILITY)); |
| } |
| |
| /** |
| * Returns true if the Bloom filter was modified by inserting the byte representation of an |
| * element contained in {@code buf} from {@code [offset, offset + length)}. |
| */ |
| public boolean put(final byte[] buf, final int off, final int len) { |
| ByteBuffer buffer = ByteBuffer.wrap(buf, off, len); |
| return put(buffer); |
| } |
| |
| /** |
| * Returns true if the Bloom filter was modified by inserting the byte representation of an |
| * element contained in {@code byteBuffer} from {@code [position, limit)}. |
| */ |
| public boolean put(final ByteBuffer byteBuffer) { |
| if (bloomFilters.get(bloomFilters.size() - 1).mightContain(byteBuffer)) { |
| // We do not gain any information by adding this element |
| return false; |
| } |
| |
| int bloomFilterToStartWith = |
| Math.min( |
| Long.SIZE - Long.numberOfLeadingZeros(numberOfInsertions), bloomFilters.size() - 1); |
| for (int i = bloomFilterToStartWith; i < bloomFilters.size(); ++i) { |
| bloomFilters.get(i).put(byteBuffer); |
| } |
| numberOfInsertions += 1; |
| return true; |
| } |
| |
| /** Returns a scalable Bloom filter with the elements that were added. */ |
| public ScalableBloomFilter build() { |
| int bloomFilterToUse = Long.SIZE - Long.numberOfLeadingZeros(numberOfInsertions); |
| if (Long.bitCount(numberOfInsertions) == 1) { |
| bloomFilterToUse -= 1; |
| } |
| bloomFilterToUse = Math.min(bloomFilterToUse, bloomFilters.size() - 1); |
| return new ScalableBloomFilter(bloomFilters.get(bloomFilterToUse)); |
| } |
| } |
| |
| /** |
| * Writes {@link ByteBuffer}s to {@link PrimitiveSink}s and meant to be used with Guava's {@link |
| * BloomFilter} API. This {@link Funnel} does not modify the underlying byte buffer and assumes |
| * that {@code ByteBuffer#array} returns the backing data. |
| */ |
| private static class ByteBufferFunnel implements Funnel<ByteBuffer> { |
| private static final ByteBufferFunnel INSTANCE = new ByteBufferFunnel(); |
| |
| @Override |
| @SuppressWarnings("ByteBufferBackingArray") |
| public void funnel(ByteBuffer from, PrimitiveSink into) { |
| into.putBytes(from.array(), from.position(), from.remaining()); |
| } |
| } |
| } |