APEXMALHAR-2366 #resolve #comment Apply BloomFilter to Bucket, use internal BloomFilter
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
index 6292fe2..8e1112e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
@@ -232,6 +232,12 @@
protected ConcurrentLinkedQueue<Long> windowsForFreeMemory = new ConcurrentLinkedQueue<>();
+ private static boolean disableBloomFilterByDefault = false;
+ private boolean disableBloomFilter = disableBloomFilterByDefault;
+ private static int bloomFilterDefaultBitSize = 1000000;
+ private SliceBloomFilter bloomFilter = null;
+ private int bloomFilterBitSize = bloomFilterDefaultBitSize;
+
private DefaultBucket()
{
//for kryo
@@ -247,6 +253,9 @@
public void setup(@NotNull ManagedStateContext managedStateContext)
{
this.managedStateContext = Preconditions.checkNotNull(managedStateContext, "managed state context");
+ if (!disableBloomFilter && bloomFilter == null) {
+ bloomFilter = new SliceBloomFilter(bloomFilterBitSize, 0.99);
+ }
}
@Override
@@ -353,6 +362,33 @@
}
}
+
+ private int filteredCount = 0;
+ private int unfilteredCount = 0;
+ private static final int STATISTICS_BARRIER_NUM = 10000;
+ /**
+ * Test the result of using bloom filter and remove it if it not helpful for improving the performance.
+ *
+ * @param mightContain true if collection might contain the value
+ */
+ private void verifyBloomFilter(boolean mightContain)
+ {
+ if (!mightContain) {
+ filteredCount++;
+ } else {
+ unfilteredCount++;
+ }
+
+ if (unfilteredCount + filteredCount > STATISTICS_BARRIER_NUM && unfilteredCount > filteredCount * 4) {
+ unloadBloomFilter();
+ }
+ }
+
+ private void unloadBloomFilter()
+ {
+ bloomFilter = null;
+ }
+
/**
* Returns the value for the key from a valid time-bucket reader. Here, valid means the time bucket which is not purgeable.
* If the timebucketAssigner is of type MovingBoundaryTimeBucketAssigner and the time bucket is purgeable, then return null.
@@ -362,11 +398,21 @@
*/
private BucketedValue getValueFromTimeBucketReader(Slice key, long timeBucket)
{
-
if (managedStateContext.getTimeBucketAssigner() instanceof MovingBoundaryTimeBucketAssigner &&
timeBucket <= ((MovingBoundaryTimeBucketAssigner)managedStateContext.getTimeBucketAssigner()).getLowestPurgeableTimeBucket()) {
return null;
}
+
+ if (bloomFilter != null) {
+ boolean mightContain = bloomFilter.mightContain(key);
+
+ verifyBloomFilter(mightContain);
+
+ if (!mightContain) {
+ return null;
+ }
+ }
+
FileAccess.FileReader fileReader = readers.get(timeBucket);
if (fileReader != null) {
return readValue(fileReader, key, timeBucket);
@@ -417,6 +463,7 @@
{
// This call is lightweight
releaseMemory();
+
key = SliceUtils.toBufferSlice(key);
value = SliceUtils.toBufferSlice(value);
@@ -460,6 +507,14 @@
entryIter.remove();
for (Map.Entry<Slice, BucketedValue> entry : windowData.entrySet()) {
+ /**
+ * The data still in memory and reachable before the memory released
+ * So put key into bloom filter here
+ */
+ if (bloomFilter != null) {
+ bloomFilter.put(entry.getKey());
+ }
+
memoryFreed += entry.getKey().length + entry.getValue().getSize();
}
}
@@ -511,6 +566,7 @@
public Map<Slice, BucketedValue> checkpoint(long windowId)
{
releaseMemory();
+
try {
//transferring the data from flash to check-pointed state in finally block and re-initializing the flash.
return flash;
@@ -635,6 +691,48 @@
return valueStream;
}
+ public static int getBloomFilterDefaultBitSize()
+ {
+ return bloomFilterDefaultBitSize;
+ }
+
+ public static void setBloomFilterDefaultBitSize(int bloomFilterDefaultBitSize)
+ {
+ DefaultBucket.bloomFilterDefaultBitSize = bloomFilterDefaultBitSize;
+ }
+
+ public int getBloomFilterBitSize()
+ {
+ return bloomFilterBitSize;
+ }
+
+ public void setBloomFilterBitSize(int bloomFilterBitSize)
+ {
+ this.bloomFilterBitSize = bloomFilterBitSize;
+ }
+
+ public static boolean isDisableBloomFilterByDefault()
+ {
+ return disableBloomFilterByDefault;
+ }
+
+ public static void setDisableBloomFilterByDefault(boolean disableBloomFilterByDefault)
+ {
+ DefaultBucket.disableBloomFilterByDefault = disableBloomFilterByDefault;
+ }
+
+ public boolean isDisableBloomFilter()
+ {
+ return disableBloomFilter;
+ }
+
+ public void setDisableBloomFilter(boolean disableBloomFilter)
+ {
+ this.disableBloomFilter = disableBloomFilter;
+ this.unloadBloomFilter();
+ }
+
+
private static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class);
}
}
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/SliceBloomFilter.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/SliceBloomFilter.java
new file mode 100644
index 0000000..4fd9e02
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/SliceBloomFilter.java
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.BitSet;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This class implemented BloomFilter algorithm, the key is Slice
+ *
+ */
+public class SliceBloomFilter
+{
+ private BitSet bitset;
+ private int bitSetSize;
+ private int expectedNumberOfFilterElements; // expected (maximum) number of elements to be added
+ private int numberOfAddedElements; // number of elements actually added to the Bloom filter
+ private int numberOfHashes; // number of hash functions
+ protected transient HashFunction hasher = new HashFunction();
+
+ /**
+ * Set the attributes to the empty Bloom filter. The total length of the Bloom
+ * filter will be bitsPerElement*expectedNumberOfFilterElements.
+ *
+ * @param bitsPerElement
+ * is the number of bits used per element.
+ * @param expectedNumberOfFilterElements
+ * is the expected number of elements the filter will contain.
+ * @param numberOfHashes
+ * is the number of hash functions used.
+ */
+ private void SetAttributes(double bitsPerElement, int expectedNumberOfFilterElements, int numberOfHashes)
+ {
+ this.expectedNumberOfFilterElements = expectedNumberOfFilterElements;
+ this.numberOfHashes = numberOfHashes;
+ this.bitSetSize = (int)Math.ceil(bitsPerElement * expectedNumberOfFilterElements);
+ numberOfAddedElements = 0;
+ this.bitset = new BitSet(bitSetSize);
+ }
+
+ private SliceBloomFilter()
+ {
+ //for kyro
+ }
+
+ /**
+ * Constructs an empty Bloom filter with a given false positive probability.
+ *
+ * @param expectedNumberOfElements
+ * is the expected number of elements the filter will contain.
+ * @param falsePositiveProbability
+ * is the desired false positive probability.
+ */
+ public SliceBloomFilter(int expectedNumberOfElements, double falsePositiveProbability)
+ {
+ if (this.bitset == null) {
+ SetAttributes(Math.ceil(-(Math.log(falsePositiveProbability) / Math.log(2))) / Math.log(2), // c = k / ln(2)
+ expectedNumberOfElements, (int)Math.ceil(-(Math.log(falsePositiveProbability) / Math.log(2))));
+ }
+ }
+
+ /**
+ * Generate integer array based on the hash function till the number of
+ * hashes.
+ *
+ * @param slice
+ * specifies input slice.
+ * @return array of int-sized hashes
+ */
+ private int[] createHashes(Slice slice)
+ {
+ int[] result = new int[numberOfHashes];
+ long hash64 = hasher.hash(slice);
+ // apply the less hashing technique
+ int hash1 = (int)hash64;
+ int hash2 = (int)(hash64 >>> 32);
+ for (int i = 1; i <= numberOfHashes; i++) {
+ int nextHash = hash1 + i * hash2;
+ if (nextHash < 0) {
+ nextHash = ~nextHash;
+ }
+ result[i - 1] = nextHash;
+ }
+ return result;
+ }
+
+ /**
+ * Calculates the expected probability of false positives based on the number
+ * of expected filter elements and the size of the Bloom filter.
+ *
+ * @return expected probability of false positives.
+ */
+ public double expectedFalsePositiveProbability()
+ {
+ return getFalsePositiveProbability(expectedNumberOfFilterElements);
+ }
+
+ /**
+ * Calculate the probability of a false positive given the specified number of
+ * inserted elements.
+ *
+ * @param numberOfElements
+ * number of inserted elements.
+ * @return probability of a false positive.
+ */
+ public double getFalsePositiveProbability(double numberOfElements)
+ {
+ // (1 - e^(-k * n / m)) ^ k
+ return Math.pow((1 - Math.exp(-numberOfHashes * numberOfElements / bitSetSize)), numberOfHashes);
+ }
+
+ /**
+ * Get the current probability of a false positive. The probability is
+ * calculated from the size of the Bloom filter and the current number of
+ * elements added to it.
+ *
+ * @return probability of false positives.
+ */
+ public double getFalsePositiveProbability()
+ {
+ return getFalsePositiveProbability(numberOfAddedElements);
+ }
+
+ /**
+ * Returns the value chosen for numberOfHashes.<br />
+ * <br />
+ * numberOfHashes is the optimal number of hash functions based on the size of
+ * the Bloom filter and the expected number of inserted elements.
+ *
+ * @return optimal numberOfHashes.
+ */
+ public int getNumberOfHashes()
+ {
+ return numberOfHashes;
+ }
+
+ /**
+ * Sets all bits to false in the Bloom filter.
+ */
+ public void clear()
+ {
+ bitset.clear();
+ numberOfAddedElements = 0;
+ }
+
+ /**
+ * Adds a slice of byte array to the Bloom filter.
+ *
+ * @param slice
+ * slice of byte array to add to the Bloom filter.
+ */
+ public void put(Slice slice)
+ {
+ int[] hashes = createHashes(slice);
+ for (int hash : hashes) {
+ bitset.set(Math.abs(hash % bitSetSize), true);
+ }
+ numberOfAddedElements++;
+ }
+
+ /**
+ * Returns true if the slice of byte array could have been inserted into the Bloom
+ * filter. Use getFalsePositiveProbability() to calculate the probability of
+ * this being correct.
+ *
+ * @param slice
+ * slice of byte array to check.
+ * @return true if the array could have been inserted into the Bloom filter.
+ */
+ public boolean mightContain(Slice slice)
+ {
+ int[] hashes = createHashes(slice);
+ for (int hash : hashes) {
+ if (!bitset.get(Math.abs(hash % bitSetSize))) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Read a single bit from the Bloom filter.
+ *
+ * @param bit
+ * the bit to read.
+ * @return true if the bit is set, false if it is not.
+ */
+ public boolean getBit(int bit)
+ {
+ return bitset.get(bit);
+ }
+
+ /**
+ * Set a single bit in the Bloom filter.
+ *
+ * @param bit
+ * is the bit to set.
+ * @param value
+ * If true, the bit is set. If false, the bit is cleared.
+ */
+ public void setBit(int bit, boolean value)
+ {
+ bitset.set(bit, value);
+ }
+
+ /**
+ * Return the bit set used to store the Bloom filter.
+ *
+ * @return bit set representing the Bloom filter.
+ */
+ public BitSet getBitSet()
+ {
+ return bitset;
+ }
+
+ public void setBitSet(BitSet bitset)
+ {
+ this.bitset = bitset;
+ }
+
+ /**
+ * Returns the number of bits in the Bloom filter. Use count() to retrieve the
+ * number of inserted elements.
+ *
+ * @return the size of the bitset used by the Bloom filter.
+ */
+ public int size()
+ {
+ return this.bitSetSize;
+ }
+
+ /**
+ * Returns the number of elements added to the Bloom filter after it was
+ * constructed or after clear() was called.
+ *
+ * @return number of elements added to the Bloom filter.
+ */
+ public int count()
+ {
+ return this.numberOfAddedElements;
+ }
+
+ /**
+ * Returns the expected number of elements to be inserted into the filter.
+ * This value is the same value as the one passed to the constructor.
+ *
+ * @return expected number of elements.
+ */
+ public int getExpectedNumberOfElements()
+ {
+ return expectedNumberOfFilterElements;
+ }
+
+ /**
+ * Get actual number of bits per element based on the number of elements that
+ * have currently been inserted and the length of the Bloom filter. See also
+ * getExpectedBitsPerElement().
+ *
+ * @return number of bits per element.
+ */
+ public double getBitsPerElement()
+ {
+ return this.bitSetSize / (double)numberOfAddedElements;
+ }
+
+ /**
+ * Set the hasher in the Bloom filter.
+ *
+ * @param hasher
+ * is the hash function to set.
+ */
+
+ public void setHasher(HashFunction hasher)
+ {
+ this.hasher = hasher;
+ }
+
+ public static final class HashFunction
+ {
+ private static final long SEED = 0x7f3a21eaL;
+ private static long X64_128_C1 = 0x87c37b91114253d5L;
+ private static long X64_128_C2 = 0x4cf5ad432745937fL;
+ /**
+ * Helps convert a byte into its unsigned value
+ */
+ public static final int UNSIGNED_MASK = 0xff;
+
+ /**
+ * Return the hash of the bytes as long.
+ *
+ * @param bytes
+ * the bytes to be hashed
+ *
+ * @return the generated hash value
+ */
+ public long hash(Slice slice)
+ {
+ long h1 = SEED;
+ long h2 = SEED;
+
+ //the offset related to the begin of slice
+ int relativeOffset = 0;
+ while (slice.length - relativeOffset >= 16) {
+ long k1 = getLong(slice, slice.offset + relativeOffset);
+ relativeOffset += 8; //size of long count by int
+ long k2 = getLong(slice, slice.offset + relativeOffset);
+ relativeOffset += 8;
+
+ h1 ^= mixK1(k1);
+
+ h1 = Long.rotateLeft(h1, 27);
+ h1 += h2;
+ h1 = h1 * 5 + 0x52dce729;
+
+ h2 ^= mixK2(k2);
+
+ h2 = Long.rotateLeft(h2, 31);
+ h2 += h1;
+ h2 = h2 * 5 + 0x38495ab5;
+ }
+
+ if (slice.length > relativeOffset) {
+ long k1 = 0;
+ long k2 = 0;
+ int absoluteOffset = slice.offset + relativeOffset;
+ switch (slice.length - relativeOffset) {
+ case 15:
+ k2 ^= (long)(slice.buffer[absoluteOffset + 14] & UNSIGNED_MASK) << 48; // fall through
+
+ case 14:
+ k2 ^= (long)(slice.buffer[absoluteOffset + 13] & UNSIGNED_MASK) << 40; // fall through
+
+ case 13:
+ k2 ^= (long)(slice.buffer[absoluteOffset + 12] & UNSIGNED_MASK) << 32; // fall through
+
+ case 12:
+ k2 ^= (long)(slice.buffer[absoluteOffset + 11] & UNSIGNED_MASK) << 24; // fall through
+
+ case 11:
+ k2 ^= (long)(slice.buffer[absoluteOffset + 10] & UNSIGNED_MASK) << 16; // fall through
+
+ case 10:
+ k2 ^= (long)(slice.buffer[absoluteOffset + 9] & UNSIGNED_MASK) << 8; // fall through
+
+ case 9:
+ k2 ^= slice.buffer[absoluteOffset + 8] & UNSIGNED_MASK; // fall through
+
+ case 8:
+ k1 ^= getLong(slice, absoluteOffset);
+ break;
+
+ case 7:
+ k1 ^= (long)(slice.buffer[absoluteOffset + 6] & UNSIGNED_MASK) << 48; // fall through
+
+ case 6:
+ k1 ^= (long)(slice.buffer[absoluteOffset + 5] & UNSIGNED_MASK) << 40; // fall through
+
+ case 5:
+ k1 ^= (long)(slice.buffer[absoluteOffset + 4] & UNSIGNED_MASK) << 32; // fall through
+
+ case 4:
+ k1 ^= (long)(slice.buffer[absoluteOffset + 3] & UNSIGNED_MASK) << 24; // fall through
+
+ case 3:
+ k1 ^= (long)(slice.buffer[absoluteOffset + 2] & UNSIGNED_MASK) << 16; // fall through
+
+ case 2:
+ k1 ^= (long)(slice.buffer[absoluteOffset + 1] & UNSIGNED_MASK) << 8; // fall through
+
+ case 1:
+ k1 ^= slice.buffer[absoluteOffset] & UNSIGNED_MASK;
+ break;
+
+ default:
+ throw new AssertionError("Code should not reach here!");
+ }
+
+ // mix
+ h1 ^= mixK1(k1);
+ h2 ^= mixK2(k2);
+ }
+
+ // ----------
+ // finalization
+
+ h1 ^= slice.length;
+ h2 ^= slice.length;
+
+ h1 += h2;
+ h2 += h1;
+
+ h1 = fmix64(h1);
+ h2 = fmix64(h2);
+
+ h1 += h2;
+ h2 += h1;
+
+ return h1;
+ }
+
+ private static long getLong(Slice slice, int absoluteOffset)
+ {
+ return ((((long)slice.buffer[absoluteOffset++]) << 56) |
+ (((long)slice.buffer[absoluteOffset++] & 0xff) << 48) |
+ (((long)slice.buffer[absoluteOffset++] & 0xff) << 40) |
+ (((long)slice.buffer[absoluteOffset++] & 0xff) << 32) |
+ (((long)slice.buffer[absoluteOffset++] & 0xff) << 24) |
+ (((long)slice.buffer[absoluteOffset++] & 0xff) << 16) |
+ (((long)slice.buffer[absoluteOffset++] & 0xff) << 8) |
+ (((long)slice.buffer[absoluteOffset++] & 0xff) ));
+ }
+
+ private static long mixK1(long k1)
+ {
+ k1 *= X64_128_C1;
+ k1 = Long.rotateLeft(k1, 31);
+ k1 *= X64_128_C2;
+
+ return k1;
+ }
+
+ private static long mixK2(long k2)
+ {
+ k2 *= X64_128_C2;
+ k2 = Long.rotateLeft(k2, 33);
+ k2 *= X64_128_C1;
+
+ return k2;
+ }
+
+ private static long fmix64(long k)
+ {
+ k ^= k >>> 33;
+ k *= 0xff51afd7ed558ccdL;
+ k ^= k >>> 33;
+ k *= 0xc4ceb9fe1a85ec53L;
+ k ^= k >>> 33;
+
+ return k;
+ }
+ }
+}
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
index 8a63f5a..8bbc3dc 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
@@ -28,6 +28,7 @@
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
+import org.apache.apex.malhar.lib.state.managed.Bucket.DefaultBucket;
import org.apache.apex.malhar.lib.state.managed.Bucket.ReadSource;
import org.apache.apex.malhar.lib.utils.serde.AffixSerde;
import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
@@ -52,6 +53,9 @@
@Override
protected void starting(Description description)
{
+ //lots of test case get around the normal workflow and directly write to file. So should disable bloom filter
+ DefaultBucket.setDisableBloomFilterByDefault(true);
+
TestUtils.deleteTargetTestClassFolder(description);
managedStateContext = new MockManagedStateContext(ManagedStateTestUtils.getOperatorContext(9));
applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
@@ -193,6 +197,8 @@
@Test
public void testFreeMemory() throws IOException
{
+ DefaultBucket.setDisableBloomFilterByDefault(false);
+
testMeta.defaultBucket.setup(testMeta.managedStateContext);
testGetFromReader();
long initSize = testMeta.defaultBucket.getSizeInBytes();
@@ -227,4 +233,33 @@
testMeta.defaultBucket.teardown();
}
+
+ @Test
+ public void testBloomFilter() throws IOException
+ {
+ testMeta.defaultBucket.setDisableBloomFilter(false);
+ testMeta.defaultBucket.setup(testMeta.managedStateContext);
+ final int itemSize = 1000;
+ final int bucketId = 1;
+ for (int i = 0; i < itemSize; i += 2) {
+ //put only even value
+ Slice keyAndValue = ManagedStateTestUtils.getSliceFor(String.valueOf(i));
+ testMeta.defaultBucket.put(keyAndValue, bucketId, keyAndValue);
+ }
+
+ testMeta.defaultBucket.freeMemory(Long.MAX_VALUE);
+
+ for (int i = 0; i < itemSize; ++i) {
+ //put only even value
+ Slice key = ManagedStateTestUtils.getSliceFor(String.valueOf(i));
+ Slice value = testMeta.defaultBucket.get(key, bucketId, ReadSource.ALL);
+ if ((i & 0x01) == 0) {
+ Assert.assertEquals(key, value);
+ } else {
+ Assert.assertTrue(value == null);
+ }
+ }
+
+ testMeta.defaultBucket.teardown();
+ }
}
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
index 2882828..d5f4856 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
@@ -30,6 +30,8 @@
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
+import org.apache.apex.malhar.lib.state.managed.Bucket.DefaultBucket;
+
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
@@ -83,6 +85,7 @@
Slice zero = ManagedStateTestUtils.getSliceFor("0");
long time = System.currentTimeMillis();
+ DefaultBucket.setDisableBloomFilterByDefault(true);
testMeta.managedState.setup(testMeta.operatorContext);
Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, time);
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
index 42ab187..faa6f71 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
@@ -30,6 +30,8 @@
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
+import org.apache.apex.malhar.lib.state.managed.Bucket.DefaultBucket;
+
import com.datatorrent.api.Context;
import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
import com.datatorrent.lib.util.TestUtils;
@@ -116,6 +118,8 @@
@Test
public void testSyncGetFromFiles() throws IOException, ExecutionException, InterruptedException
{
+ DefaultBucket.setDisableBloomFilterByDefault(true);
+
Slice zero = ManagedStateTestUtils.getSliceFor("0");
long time = System.currentTimeMillis();
@@ -138,6 +142,8 @@
@Test
public void testAsyncSyncGetFromFiles() throws IOException, ExecutionException, InterruptedException
{
+ DefaultBucket.setDisableBloomFilterByDefault(true);
+
Slice zero = ManagedStateTestUtils.getSliceFor("0");
long time = System.currentTimeMillis();
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/SliceBloomFilterTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/SliceBloomFilterTest.java
new file mode 100644
index 0000000..9fce3a5
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/SliceBloomFilterTest.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.netlet.util.Slice;
+
+public class SliceBloomFilterTest
+{
+ private int loop = 100000;
+
+ @Test
+ public void testBloomFilterForBytes()
+ {
+ final int maxSliceLength = 1000;
+ Random random = new Random();
+ final byte[] bytes = new byte[loop + maxSliceLength];
+ random.nextBytes(bytes);
+
+ long beginTime = System.currentTimeMillis();
+ SliceBloomFilter bloomFilter = new SliceBloomFilter(100000, 0.99);
+ for (int i = 0; i < loop; i++) {
+ bloomFilter.put(new Slice(bytes, i, i % maxSliceLength + 1));
+ }
+
+ for (int i = 0; i < loop; i++) {
+ Assert.assertTrue(bloomFilter.mightContain(new Slice(bytes, i, i % maxSliceLength + 1)));
+ }
+ }
+
+ @Test
+ public void testBloomFilterForInt()
+ {
+ testBloomFilterForInt(2);
+ testBloomFilterForInt(3);
+ testBloomFilterForInt(5);
+ testBloomFilterForInt(7);
+ }
+
+ public void testBloomFilterForInt(int span)
+ {
+ double expectedFalseProbability = 0.3;
+ SerializationBuffer buffer = SerializationBuffer.READ_BUFFER;
+
+ SliceBloomFilter bloomFilter = new SliceBloomFilter(loop, expectedFalseProbability);
+
+ for (int i = 0; i < loop; i++) {
+ if (i % span == 0) {
+ buffer.writeInt(i);
+ bloomFilter.put(buffer.toSlice());
+ }
+ }
+ buffer.getWindowedBlockStream().releaseAllFreeMemory();
+
+ int falsePositive = 0;
+ for (int i = 0; i < loop; i++) {
+ buffer.writeInt(i);
+ if (!bloomFilter.mightContain(buffer.toSlice())) {
+ Assert.assertTrue(i % span != 0);
+ } else {
+ // BF says its present
+ if (i % 2 != 0) {
+ // But was not there
+ falsePositive++;
+ }
+ }
+ }
+ buffer.getWindowedBlockStream().releaseAllFreeMemory();
+ // Verify false positive prob
+ double falsePositiveProb = falsePositive;
+ falsePositiveProb /= loop;
+ Assert.assertTrue(falsePositiveProb <= expectedFalseProbability);
+
+ for (int i = 0; i < loop; i++) {
+ if (i % span == 0) {
+ buffer.writeInt(i);
+ Assert.assertTrue(bloomFilter.mightContain(buffer.toSlice()));
+ }
+ }
+ buffer.getWindowedBlockStream().releaseAllFreeMemory();
+ }
+
+ private static class FilterOperator extends BaseOperator
+ {
+ private SliceBloomFilter bloomFilter = new SliceBloomFilter(10000, 0.99);
+ private SerializationBuffer buffer = SerializationBuffer.READ_BUFFER;
+
+ public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
+ {
+ @Override
+ public void process(String tuple)
+ {
+ processTuple(tuple);
+ }
+ };
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ }
+
+ private int count = 0;
+
+ public void processTuple(String tuple)
+ {
+ buffer.writeString(tuple);
+ bloomFilter.mightContain(buffer.toSlice());
+ buffer.reset();
+ }
+ }
+
+ private static class TestInputOperator extends BaseOperator implements InputOperator
+ {
+ public final transient DefaultOutputPort<String> data = new DefaultOutputPort<String>();
+ private int current = 0;
+
+ @Override
+ public void emitTuples()
+ {
+ data.emit("" + current++);
+ }
+ }
+
+ /**
+ * Just test SliceBloomFilter can be used by operator. such as it is serializable etc
+ * @throws Exception
+ */
+ @Test
+ public void testBloomFilterForApplication() throws Exception
+ {
+ Configuration conf = new Configuration(false);
+
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ TestInputOperator generator = new TestInputOperator();
+ dag.addOperator("Generator", generator);
+
+ FilterOperator filterOperator = new FilterOperator();
+ dag.addOperator("filterOperator", filterOperator);
+ dag.addStream("Data", generator.data, filterOperator.input).setLocality(Locality.CONTAINER_LOCAL);
+
+ StreamingApplication app = new StreamingApplication()
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ }
+ };
+
+ lma.prepareDAG(app, conf);
+
+ // Create local cluster
+ final LocalMode.Controller lc = lma.getController();
+ lc.run(3000);
+
+ lc.shutdown();
+ }
+}