blob: eabcb48b6a51df57a48860a03b1cdbfaa415834c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker.util;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.BloomFilter;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Funnel;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.PrimitiveSink;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.DoubleMath;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath;
/**
* A Bloom filter implementation that maintains an expected false probability of {@code 0.000001}
* while growing with the number of inserations up to a specified size or insertion limit. Once the
* limit is reached, the expected false positive probability can no longer be honored. This limit
* prevents the Bloom filter from growing too large when there are many insertions. The Bloom filter
* that is built is the smaller of twice the number of insertions or the user specified limit.
*/
public class ScalableBloomFilter implements Serializable {
/**
* A {@link Coder} for scalable Bloom filters. The encoded format is (see {@link
* BloomFilter#writeTo(OutputStream)}):
*
* <ul>
* <li>1 signed byte for the strategy
* <li>1 unsigned byte for the number of hash functions
* <li>1 big endian int, the number of longs in our bitset
* <li>N big endian longs of our bitset
* </ul>
*/
public static class ScalableBloomFilterCoder extends AtomicCoder<ScalableBloomFilter> {
private static final ScalableBloomFilterCoder INSTANCE = new ScalableBloomFilterCoder();
public static ScalableBloomFilterCoder of() {
return INSTANCE;
}
@Override
public void encode(ScalableBloomFilter value, OutputStream outStream)
throws CoderException, IOException {
value.bloomFilter.writeTo(outStream);
}
@Override
public ScalableBloomFilter decode(InputStream inStream) throws CoderException, IOException {
return new ScalableBloomFilter(BloomFilter.readFrom(inStream, ByteBufferFunnel.INSTANCE));
}
@Override
public void verifyDeterministic() {}
@Override
public boolean consistentWithEquals() {
return true;
}
}
private static final long MAX_ELEMENTS = 1L << 62;
private static final double DEFAULT_FALSE_POSITIVE_PROBABILITY = 0.000001;
private final BloomFilter<ByteBuffer> bloomFilter;
private ScalableBloomFilter(BloomFilter<ByteBuffer> bloomFilter) {
this.bloomFilter = bloomFilter;
}
/**
* Returns false if the Bloom filter definitely does not contain the byte representation of an
* element contained in {@code buf} from {@code [offset, offset + length)}.
*/
public boolean mightContain(byte[] buf, int offset, int length) {
ByteBuffer byteBuffer = ByteBuffer.wrap(buf, offset, length);
return mightContain(byteBuffer);
}
/**
* Returns false if the Bloom filter definitely does not contain the byte representation of an
* element contained in {@code byteBuffer} from {@code [position, limit]}.
*/
public boolean mightContain(ByteBuffer byteBuffer) {
return bloomFilter.mightContain(byteBuffer);
}
@Override
public boolean equals(Object other) {
if (other == this) {
return true;
}
if (!(other instanceof ScalableBloomFilter)) {
return false;
}
ScalableBloomFilter scalableBloomFilter = (ScalableBloomFilter) other;
return Objects.equals(bloomFilter, scalableBloomFilter.bloomFilter);
}
@Override
public int hashCode() {
return bloomFilter.hashCode();
}
@Override
public String toString() {
return MoreObjects.toStringHelper(ScalableBloomFilter.class)
.add("bloomFilter", bloomFilter)
.toString();
}
/**
* Returns a scalable Bloom filter builder allowing one to construct a Bloom filter with the
* specified size limit. Each insertion may grow the Bloom filter up to the specified size limit.
* While within the size limit, the expected false positive probability is {@code 0.000001}. Once
* the size limit is surpassed, the Bloom filter will no longer honor the expected false positive
* probability of {@code 0.000001}.
*/
public static Builder withMaximumSizeBytes(long maxBloomFilterSizeBytes) {
checkArgument(maxBloomFilterSizeBytes > 0, "Expected Bloom filter size limit to be positive.");
long optimalNumberOfElements =
optimalNumInsertions(maxBloomFilterSizeBytes, DEFAULT_FALSE_POSITIVE_PROBABILITY);
checkArgument(
optimalNumberOfElements <= MAX_ELEMENTS,
"The specified size limit would attempt to create a Bloom filter builder larger than "
+ "the maximum supported size of 2^63.");
return withMaximumNumberOfInsertionsForOptimalBloomFilter(optimalNumberOfElements);
}
/**
* Returns a scalable Bloom filter builder allowing one to construct a Bloom filter that will
* maintain the expected false positive probability of {@code 0.000001} as long as there have been
* fewer insertions then the specified limit. Once the limit is surpassed, the Bloom filter will
* no longer honor the expected false positive probability of {@code 0.000001}.
*/
public static Builder withMaximumNumberOfInsertionsForOptimalBloomFilter(
long maximumBloomFilterInsertionsForOptimalBloomFilter) {
checkArgument(
maximumBloomFilterInsertionsForOptimalBloomFilter > 0,
"Expected Bloom filter insertion limit to be positive.");
checkArgument(
maximumBloomFilterInsertionsForOptimalBloomFilter <= MAX_ELEMENTS,
"The specified size is larger than the maximum supported size of 2^63 elements.");
return new Builder(maximumBloomFilterInsertionsForOptimalBloomFilter);
}
/** Calculate the number of optimal insertions based upon the number of bytes. */
private static long optimalNumInsertions(long bytes, double p) {
checkArgument(p > 0, "Expected false positive probability to be positive.");
return LongMath.checkedMultiply(
8,
DoubleMath.roundToLong(
-bytes * Math.log(2) * Math.log(2) / Math.log(p), RoundingMode.DOWN));
}
/**
* A scalable Bloom filter builder which uses approximately double the amount of memory for the
* specified optimal number of insertions. The returned Bloom filter will scale with the number of
* insertions honoring the expected false positive probability rate of {@code 0.000001} until the
* limit is reached. The Bloom filter that is built is the smaller of twice the number of
* insertions or the user specified limit.
*/
public static class Builder {
private final List<BloomFilter<ByteBuffer>> bloomFilters;
private long numberOfInsertions;
private Builder(long maximumBloomFilterInsertions) {
this.bloomFilters = new ArrayList<>();
// 1, 2, 4, 8, 16, 32, ...
for (long i = 1; i < maximumBloomFilterInsertions; i = i << 1) {
bloomFilters.add(
BloomFilter.<ByteBuffer>create(
ByteBufferFunnel.INSTANCE, i, DEFAULT_FALSE_POSITIVE_PROBABILITY));
}
// Add the largest Bloom filter we will scale to based off of what the user requested.
bloomFilters.add(
BloomFilter.<ByteBuffer>create(
ByteBufferFunnel.INSTANCE,
maximumBloomFilterInsertions,
DEFAULT_FALSE_POSITIVE_PROBABILITY));
}
/**
* Returns true if the Bloom filter was modified by inserting the byte representation of an
* element contained in {@code buf} from {@code [offset, offset + length)}.
*/
public boolean put(final byte[] buf, final int off, final int len) {
ByteBuffer buffer = ByteBuffer.wrap(buf, off, len);
return put(buffer);
}
/**
* Returns true if the Bloom filter was modified by inserting the byte representation of an
* element contained in {@code byteBuffer} from {@code [position, limit)}.
*/
public boolean put(final ByteBuffer byteBuffer) {
if (bloomFilters.get(bloomFilters.size() - 1).mightContain(byteBuffer)) {
// We do not gain any information by adding this element
return false;
}
int bloomFilterToStartWith =
Math.min(
Long.SIZE - Long.numberOfLeadingZeros(numberOfInsertions), bloomFilters.size() - 1);
for (int i = bloomFilterToStartWith; i < bloomFilters.size(); ++i) {
bloomFilters.get(i).put(byteBuffer);
}
numberOfInsertions += 1;
return true;
}
/** Returns a scalable Bloom filter with the elements that were added. */
public ScalableBloomFilter build() {
int bloomFilterToUse = Long.SIZE - Long.numberOfLeadingZeros(numberOfInsertions);
if (Long.bitCount(numberOfInsertions) == 1) {
bloomFilterToUse -= 1;
}
bloomFilterToUse = Math.min(bloomFilterToUse, bloomFilters.size() - 1);
return new ScalableBloomFilter(bloomFilters.get(bloomFilterToUse));
}
}
/**
* Writes {@link ByteBuffer}s to {@link PrimitiveSink}s and meant to be used with Guava's {@link
* BloomFilter} API. This {@link Funnel} does not modify the underlying byte buffer and assumes
* that {@code ByteBuffer#array} returns the backing data.
*/
private static class ByteBufferFunnel implements Funnel<ByteBuffer> {
private static final ByteBufferFunnel INSTANCE = new ByteBufferFunnel();
@Override
@SuppressWarnings("ByteBufferBackingArray")
public void funnel(ByteBuffer from, PrimitiveSink into) {
into.putBytes(from.array(), from.position(), from.remaining());
}
}
}