APEXMALHAR-2366 #resolve #comment Apply BloomFilter to Bucket, use internal BloomFilter
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
index 6292fe2..8e1112e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
@@ -232,6 +232,12 @@
 
     protected ConcurrentLinkedQueue<Long> windowsForFreeMemory = new ConcurrentLinkedQueue<>();
 
+    private static boolean disableBloomFilterByDefault = false;
+    private boolean disableBloomFilter = disableBloomFilterByDefault;
+    private static int bloomFilterDefaultBitSize = 1000000;
+    private SliceBloomFilter bloomFilter = null;
+    private int bloomFilterBitSize = bloomFilterDefaultBitSize;
+
     private DefaultBucket()
     {
       //for kryo
@@ -247,6 +253,9 @@
     public void setup(@NotNull ManagedStateContext managedStateContext)
     {
       this.managedStateContext = Preconditions.checkNotNull(managedStateContext, "managed state context");
+      if (!disableBloomFilter && bloomFilter == null) {
+        bloomFilter = new SliceBloomFilter(bloomFilterBitSize, 0.99);
+      }
     }
 
     @Override
@@ -353,6 +362,33 @@
       }
     }
 
+
+    private int filteredCount = 0;
+    private int unfilteredCount = 0;
+    private static final int STATISTICS_BARRIER_NUM = 10000;
+    /**
+     * Test the result of using bloom filter and remove it if it not helpful for improving the performance.
+     *
+     * @param mightContain true if collection might contain the value
+     */
+    private void verifyBloomFilter(boolean mightContain)
+    {
+      if (!mightContain) {
+        filteredCount++;
+      } else {
+        unfilteredCount++;
+      }
+
+      if (unfilteredCount + filteredCount > STATISTICS_BARRIER_NUM && unfilteredCount > filteredCount * 4) {
+        unloadBloomFilter();
+      }
+    }
+
+    private void unloadBloomFilter()
+    {
+      bloomFilter = null;
+    }
+
     /**
      * Returns the value for the key from a valid time-bucket reader. Here, valid means the time bucket which is not purgeable.
      * If the timebucketAssigner is of type MovingBoundaryTimeBucketAssigner and the time bucket is purgeable, then return null.
@@ -362,11 +398,21 @@
      */
     private BucketedValue getValueFromTimeBucketReader(Slice key, long timeBucket)
     {
-
       if (managedStateContext.getTimeBucketAssigner() instanceof MovingBoundaryTimeBucketAssigner &&
           timeBucket <= ((MovingBoundaryTimeBucketAssigner)managedStateContext.getTimeBucketAssigner()).getLowestPurgeableTimeBucket()) {
         return null;
       }
+
+      if (bloomFilter != null) {
+        boolean mightContain = bloomFilter.mightContain(key);
+
+        verifyBloomFilter(mightContain);
+
+        if (!mightContain) {
+          return null;
+        }
+      }
+
       FileAccess.FileReader fileReader = readers.get(timeBucket);
       if (fileReader != null) {
         return readValue(fileReader, key, timeBucket);
@@ -417,6 +463,7 @@
     {
       // This call is lightweight
       releaseMemory();
+
       key = SliceUtils.toBufferSlice(key);
       value = SliceUtils.toBufferSlice(value);
 
@@ -460,6 +507,14 @@
         entryIter.remove();
 
         for (Map.Entry<Slice, BucketedValue> entry : windowData.entrySet()) {
+          /**
+           * The data still in memory and reachable before the memory released
+           * So put key into bloom filter here
+           */
+          if (bloomFilter != null) {
+            bloomFilter.put(entry.getKey());
+          }
+
           memoryFreed += entry.getKey().length + entry.getValue().getSize();
         }
       }
@@ -511,6 +566,7 @@
     public Map<Slice, BucketedValue> checkpoint(long windowId)
     {
       releaseMemory();
+
       try {
         //transferring the data from flash to check-pointed state in finally block and re-initializing the flash.
         return flash;
@@ -635,6 +691,48 @@
       return valueStream;
     }
 
+    public static int getBloomFilterDefaultBitSize()
+    {
+      return bloomFilterDefaultBitSize;
+    }
+
+    public static void setBloomFilterDefaultBitSize(int bloomFilterDefaultBitSize)
+    {
+      DefaultBucket.bloomFilterDefaultBitSize = bloomFilterDefaultBitSize;
+    }
+
+    public int getBloomFilterBitSize()
+    {
+      return bloomFilterBitSize;
+    }
+
+    public void setBloomFilterBitSize(int bloomFilterBitSize)
+    {
+      this.bloomFilterBitSize = bloomFilterBitSize;
+    }
+
+    public static boolean isDisableBloomFilterByDefault()
+    {
+      return disableBloomFilterByDefault;
+    }
+
+    public static void setDisableBloomFilterByDefault(boolean disableBloomFilterByDefault)
+    {
+      DefaultBucket.disableBloomFilterByDefault = disableBloomFilterByDefault;
+    }
+
+    public boolean isDisableBloomFilter()
+    {
+      return disableBloomFilter;
+    }
+
+    public void setDisableBloomFilter(boolean disableBloomFilter)
+    {
+      this.disableBloomFilter = disableBloomFilter;
+      this.unloadBloomFilter();
+    }
+
+
     private static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class);
   }
 }
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/SliceBloomFilter.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/SliceBloomFilter.java
new file mode 100644
index 0000000..4fd9e02
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/SliceBloomFilter.java
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.BitSet;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This class implemented BloomFilter algorithm, the key is Slice
+ *
+ */
+public class SliceBloomFilter
+{
+  private BitSet bitset;
+  private int bitSetSize;
+  private int expectedNumberOfFilterElements; // expected (maximum) number of elements to be added
+  private int numberOfAddedElements; // number of elements actually added to the Bloom filter
+  private int numberOfHashes; // number of hash functions
+  protected transient HashFunction hasher = new HashFunction();
+
+  /**
+   * Set the attributes to the empty Bloom filter. The total length of the Bloom
+   * filter will be bitsPerElement*expectedNumberOfFilterElements.
+   *
+   * @param bitsPerElement
+   *          is the number of bits used per element.
+   * @param expectedNumberOfFilterElements
+   *          is the expected number of elements the filter will contain.
+   * @param numberOfHashes
+   *          is the number of hash functions used.
+   */
+  private void SetAttributes(double bitsPerElement, int expectedNumberOfFilterElements, int numberOfHashes)
+  {
+    this.expectedNumberOfFilterElements = expectedNumberOfFilterElements;
+    this.numberOfHashes = numberOfHashes;
+    this.bitSetSize = (int)Math.ceil(bitsPerElement * expectedNumberOfFilterElements);
+    numberOfAddedElements = 0;
+    this.bitset = new BitSet(bitSetSize);
+  }
+
+  private SliceBloomFilter()
+  {
+    //for kyro
+  }
+
+  /**
+   * Constructs an empty Bloom filter with a given false positive probability.
+   *
+   * @param expectedNumberOfElements
+   *          is the expected number of elements the filter will contain.
+   * @param falsePositiveProbability
+   *          is the desired false positive probability.
+   */
+  public SliceBloomFilter(int expectedNumberOfElements, double falsePositiveProbability)
+  {
+    if (this.bitset == null) {
+      SetAttributes(Math.ceil(-(Math.log(falsePositiveProbability) / Math.log(2))) / Math.log(2), // c = k / ln(2)
+          expectedNumberOfElements, (int)Math.ceil(-(Math.log(falsePositiveProbability) / Math.log(2))));
+    }
+  }
+
+  /**
+   * Generate integer array based on the hash function till the number of
+   * hashes.
+   *
+   * @param slice
+   *          specifies input slice.
+   * @return array of int-sized hashes
+   */
+  private int[] createHashes(Slice slice)
+  {
+    int[] result = new int[numberOfHashes];
+    long hash64 = hasher.hash(slice);
+    // apply the less hashing technique
+    int hash1 = (int)hash64;
+    int hash2 = (int)(hash64 >>> 32);
+    for (int i = 1; i <= numberOfHashes; i++) {
+      int nextHash = hash1 + i * hash2;
+      if (nextHash < 0) {
+        nextHash = ~nextHash;
+      }
+      result[i - 1] = nextHash;
+    }
+    return result;
+  }
+
+  /**
+   * Calculates the expected probability of false positives based on the number
+   * of expected filter elements and the size of the Bloom filter.
+   *
+   * @return expected probability of false positives.
+   */
+  public double expectedFalsePositiveProbability()
+  {
+    return getFalsePositiveProbability(expectedNumberOfFilterElements);
+  }
+
+  /**
+   * Calculate the probability of a false positive given the specified number of
+   * inserted elements.
+   *
+   * @param numberOfElements
+   *          number of inserted elements.
+   * @return probability of a false positive.
+   */
+  public double getFalsePositiveProbability(double numberOfElements)
+  {
+    // (1 - e^(-k * n / m)) ^ k
+    return Math.pow((1 - Math.exp(-numberOfHashes * numberOfElements / bitSetSize)), numberOfHashes);
+  }
+
+  /**
+   * Get the current probability of a false positive. The probability is
+   * calculated from the size of the Bloom filter and the current number of
+   * elements added to it.
+   *
+   * @return probability of false positives.
+   */
+  public double getFalsePositiveProbability()
+  {
+    return getFalsePositiveProbability(numberOfAddedElements);
+  }
+
+  /**
+   * Returns the value chosen for numberOfHashes.<br />
+   * <br />
+   * numberOfHashes is the optimal number of hash functions based on the size of
+   * the Bloom filter and the expected number of inserted elements.
+   *
+   * @return optimal numberOfHashes.
+   */
+  public int getNumberOfHashes()
+  {
+    return numberOfHashes;
+  }
+
+  /**
+   * Sets all bits to false in the Bloom filter.
+   */
+  public void clear()
+  {
+    bitset.clear();
+    numberOfAddedElements = 0;
+  }
+
+  /**
+   * Adds a slice of byte array to the Bloom filter.
+   *
+   * @param slice
+   *          slice of byte array to add to the Bloom filter.
+   */
+  public void put(Slice slice)
+  {
+    int[] hashes = createHashes(slice);
+    for (int hash : hashes) {
+      bitset.set(Math.abs(hash % bitSetSize), true);
+    }
+    numberOfAddedElements++;
+  }
+
+  /**
+   * Returns true if the slice of byte array could have been inserted into the Bloom
+   * filter. Use getFalsePositiveProbability() to calculate the probability of
+   * this being correct.
+   *
+   * @param slice
+   *          slice of byte array to check.
+   * @return true if the array could have been inserted into the Bloom filter.
+   */
+  public boolean mightContain(Slice slice)
+  {
+    int[] hashes = createHashes(slice);
+    for (int hash : hashes) {
+      if (!bitset.get(Math.abs(hash % bitSetSize))) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Read a single bit from the Bloom filter.
+   *
+   * @param bit
+   *          the bit to read.
+   * @return true if the bit is set, false if it is not.
+   */
+  public boolean getBit(int bit)
+  {
+    return bitset.get(bit);
+  }
+
+  /**
+   * Set a single bit in the Bloom filter.
+   *
+   * @param bit
+   *          is the bit to set.
+   * @param value
+   *          If true, the bit is set. If false, the bit is cleared.
+   */
+  public void setBit(int bit, boolean value)
+  {
+    bitset.set(bit, value);
+  }
+
+  /**
+   * Return the bit set used to store the Bloom filter.
+   *
+   * @return bit set representing the Bloom filter.
+   */
+  public BitSet getBitSet()
+  {
+    return bitset;
+  }
+
+  public void setBitSet(BitSet bitset)
+  {
+    this.bitset = bitset;
+  }
+
+  /**
+   * Returns the number of bits in the Bloom filter. Use count() to retrieve the
+   * number of inserted elements.
+   *
+   * @return the size of the bitset used by the Bloom filter.
+   */
+  public int size()
+  {
+    return this.bitSetSize;
+  }
+
+  /**
+   * Returns the number of elements added to the Bloom filter after it was
+   * constructed or after clear() was called.
+   *
+   * @return number of elements added to the Bloom filter.
+   */
+  public int count()
+  {
+    return this.numberOfAddedElements;
+  }
+
+  /**
+   * Returns the expected number of elements to be inserted into the filter.
+   * This value is the same value as the one passed to the constructor.
+   *
+   * @return expected number of elements.
+   */
+  public int getExpectedNumberOfElements()
+  {
+    return expectedNumberOfFilterElements;
+  }
+
+  /**
+   * Get actual number of bits per element based on the number of elements that
+   * have currently been inserted and the length of the Bloom filter. See also
+   * getExpectedBitsPerElement().
+   *
+   * @return number of bits per element.
+   */
+  public double getBitsPerElement()
+  {
+    return this.bitSetSize / (double)numberOfAddedElements;
+  }
+
+  /**
+   * Set the hasher in the Bloom filter.
+   *
+   * @param hasher
+   *          is the hash function to set.
+   */
+
+  public void setHasher(HashFunction hasher)
+  {
+    this.hasher = hasher;
+  }
+
+  public static final class HashFunction
+  {
+    private static final long SEED = 0x7f3a21eaL;
+    private static long X64_128_C1 = 0x87c37b91114253d5L;
+    private static long X64_128_C2 = 0x4cf5ad432745937fL;
+    /**
+     * Helps convert a byte into its unsigned value
+     */
+    public static final int UNSIGNED_MASK = 0xff;
+
+    /**
+     * Return the hash of the bytes as long.
+     *
+     * @param bytes
+     *          the bytes to be hashed
+     *
+     * @return the generated hash value
+     */
+    public long hash(Slice slice)
+    {
+      long h1 = SEED;
+      long h2 = SEED;
+
+      //the offset related to the begin of slice
+      int relativeOffset = 0;
+      while (slice.length - relativeOffset >= 16) {
+        long k1 = getLong(slice, slice.offset + relativeOffset);
+        relativeOffset += 8; //size of long count by int
+        long k2 = getLong(slice, slice.offset + relativeOffset);
+        relativeOffset += 8;
+
+        h1 ^= mixK1(k1);
+
+        h1 = Long.rotateLeft(h1, 27);
+        h1 += h2;
+        h1 = h1 * 5 + 0x52dce729;
+
+        h2 ^= mixK2(k2);
+
+        h2 = Long.rotateLeft(h2, 31);
+        h2 += h1;
+        h2 = h2 * 5 + 0x38495ab5;
+      }
+
+      if (slice.length > relativeOffset) {
+        long k1 = 0;
+        long k2 = 0;
+        int absoluteOffset = slice.offset + relativeOffset;
+        switch (slice.length - relativeOffset) {
+          case 15:
+            k2 ^= (long)(slice.buffer[absoluteOffset + 14] & UNSIGNED_MASK) << 48; // fall through
+
+          case 14:
+            k2 ^= (long)(slice.buffer[absoluteOffset + 13] & UNSIGNED_MASK) << 40; // fall through
+
+          case 13:
+            k2 ^= (long)(slice.buffer[absoluteOffset + 12] & UNSIGNED_MASK) << 32; // fall through
+
+          case 12:
+            k2 ^= (long)(slice.buffer[absoluteOffset + 11] & UNSIGNED_MASK) << 24; // fall through
+
+          case 11:
+            k2 ^= (long)(slice.buffer[absoluteOffset + 10] & UNSIGNED_MASK) << 16; // fall through
+
+          case 10:
+            k2 ^= (long)(slice.buffer[absoluteOffset + 9] & UNSIGNED_MASK) << 8; // fall through
+
+          case 9:
+            k2 ^= slice.buffer[absoluteOffset + 8] & UNSIGNED_MASK; // fall through
+
+          case 8:
+            k1 ^= getLong(slice, absoluteOffset);
+            break;
+
+          case 7:
+            k1 ^= (long)(slice.buffer[absoluteOffset + 6] & UNSIGNED_MASK) << 48; // fall through
+
+          case 6:
+            k1 ^= (long)(slice.buffer[absoluteOffset + 5] & UNSIGNED_MASK) << 40; // fall through
+
+          case 5:
+            k1 ^= (long)(slice.buffer[absoluteOffset + 4] & UNSIGNED_MASK) << 32; // fall through
+
+          case 4:
+            k1 ^= (long)(slice.buffer[absoluteOffset + 3] & UNSIGNED_MASK) << 24; // fall through
+
+          case 3:
+            k1 ^= (long)(slice.buffer[absoluteOffset + 2] & UNSIGNED_MASK) << 16; // fall through
+
+          case 2:
+            k1 ^= (long)(slice.buffer[absoluteOffset + 1] & UNSIGNED_MASK) << 8; // fall through
+
+          case 1:
+            k1 ^= slice.buffer[absoluteOffset] & UNSIGNED_MASK;
+            break;
+
+          default:
+            throw new AssertionError("Code should not reach here!");
+        }
+
+        // mix
+        h1 ^= mixK1(k1);
+        h2 ^= mixK2(k2);
+      }
+
+      // ----------
+      // finalization
+
+      h1 ^= slice.length;
+      h2 ^= slice.length;
+
+      h1 += h2;
+      h2 += h1;
+
+      h1 = fmix64(h1);
+      h2 = fmix64(h2);
+
+      h1 += h2;
+      h2 += h1;
+
+      return h1;
+    }
+
+    private static long getLong(Slice slice, int absoluteOffset)
+    {
+      return ((((long)slice.buffer[absoluteOffset++]) << 56) |
+          (((long)slice.buffer[absoluteOffset++] & 0xff) << 48) |
+          (((long)slice.buffer[absoluteOffset++] & 0xff) << 40) |
+          (((long)slice.buffer[absoluteOffset++] & 0xff) << 32) |
+          (((long)slice.buffer[absoluteOffset++] & 0xff) << 24) |
+          (((long)slice.buffer[absoluteOffset++] & 0xff) << 16) |
+          (((long)slice.buffer[absoluteOffset++] & 0xff) <<  8) |
+          (((long)slice.buffer[absoluteOffset++] & 0xff)      ));
+    }
+
+    private static long mixK1(long k1)
+    {
+      k1 *= X64_128_C1;
+      k1 = Long.rotateLeft(k1, 31);
+      k1 *= X64_128_C2;
+
+      return k1;
+    }
+
+    private static long mixK2(long k2)
+    {
+      k2 *= X64_128_C2;
+      k2 = Long.rotateLeft(k2, 33);
+      k2 *= X64_128_C1;
+
+      return k2;
+    }
+
+    private static long fmix64(long k)
+    {
+      k ^= k >>> 33;
+      k *= 0xff51afd7ed558ccdL;
+      k ^= k >>> 33;
+      k *= 0xc4ceb9fe1a85ec53L;
+      k ^= k >>> 33;
+
+      return k;
+    }
+  }
+}
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
index 8a63f5a..8bbc3dc 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
@@ -28,6 +28,7 @@
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
+import org.apache.apex.malhar.lib.state.managed.Bucket.DefaultBucket;
 import org.apache.apex.malhar.lib.state.managed.Bucket.ReadSource;
 import org.apache.apex.malhar.lib.utils.serde.AffixSerde;
 import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
@@ -52,6 +53,9 @@
     @Override
     protected void starting(Description description)
     {
+      //lots of test case get around the normal workflow and directly write to file. So should disable bloom filter
+      DefaultBucket.setDisableBloomFilterByDefault(true);
+
       TestUtils.deleteTargetTestClassFolder(description);
       managedStateContext = new MockManagedStateContext(ManagedStateTestUtils.getOperatorContext(9));
       applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
@@ -193,6 +197,8 @@
   @Test
   public void testFreeMemory() throws IOException
   {
+    DefaultBucket.setDisableBloomFilterByDefault(false);
+
     testMeta.defaultBucket.setup(testMeta.managedStateContext);
     testGetFromReader();
     long initSize = testMeta.defaultBucket.getSizeInBytes();
@@ -227,4 +233,33 @@
 
     testMeta.defaultBucket.teardown();
   }
+
+  @Test
+  public void testBloomFilter() throws IOException
+  {
+    testMeta.defaultBucket.setDisableBloomFilter(false);
+    testMeta.defaultBucket.setup(testMeta.managedStateContext);
+    final int itemSize = 1000;
+    final int bucketId = 1;
+    for (int i = 0; i < itemSize; i += 2) {
+      //put only even value
+      Slice keyAndValue = ManagedStateTestUtils.getSliceFor(String.valueOf(i));
+      testMeta.defaultBucket.put(keyAndValue, bucketId, keyAndValue);
+    }
+
+    testMeta.defaultBucket.freeMemory(Long.MAX_VALUE);
+
+    for (int i = 0; i < itemSize; ++i) {
+      //put only even value
+      Slice key = ManagedStateTestUtils.getSliceFor(String.valueOf(i));
+      Slice value = testMeta.defaultBucket.get(key, bucketId, ReadSource.ALL);
+      if ((i & 0x01) == 0) {
+        Assert.assertEquals(key, value);
+      } else {
+        Assert.assertTrue(value == null);
+      }
+    }
+
+    testMeta.defaultBucket.teardown();
+  }
 }
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
index 2882828..d5f4856 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
@@ -30,6 +30,8 @@
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
+import org.apache.apex.malhar.lib.state.managed.Bucket.DefaultBucket;
+
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.Context.OperatorContext;
@@ -83,6 +85,7 @@
     Slice zero = ManagedStateTestUtils.getSliceFor("0");
     long time = System.currentTimeMillis();
 
+    DefaultBucket.setDisableBloomFilterByDefault(true);
     testMeta.managedState.setup(testMeta.operatorContext);
 
     Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, time);
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
index 42ab187..faa6f71 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
@@ -30,6 +30,8 @@
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
+import org.apache.apex.malhar.lib.state.managed.Bucket.DefaultBucket;
+
 import com.datatorrent.api.Context;
 import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
 import com.datatorrent.lib.util.TestUtils;
@@ -116,6 +118,8 @@
   @Test
   public void testSyncGetFromFiles() throws IOException, ExecutionException, InterruptedException
   {
+    DefaultBucket.setDisableBloomFilterByDefault(true);
+
     Slice zero = ManagedStateTestUtils.getSliceFor("0");
     long time = System.currentTimeMillis();
 
@@ -138,6 +142,8 @@
   @Test
   public void testAsyncSyncGetFromFiles() throws IOException, ExecutionException, InterruptedException
   {
+    DefaultBucket.setDisableBloomFilterByDefault(true);
+
     Slice zero = ManagedStateTestUtils.getSliceFor("0");
     long time = System.currentTimeMillis();
 
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/SliceBloomFilterTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/SliceBloomFilterTest.java
new file mode 100644
index 0000000..9fce3a5
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/SliceBloomFilterTest.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.netlet.util.Slice;
+
+public class SliceBloomFilterTest
+{
+  private int loop = 100000;
+
+  @Test
+  public void testBloomFilterForBytes()
+  {
+    final int maxSliceLength = 1000;
+    Random random = new Random();
+    final byte[] bytes = new byte[loop + maxSliceLength];
+    random.nextBytes(bytes);
+
+    long beginTime = System.currentTimeMillis();
+    SliceBloomFilter bloomFilter = new SliceBloomFilter(100000, 0.99);
+    for (int i = 0; i < loop; i++) {
+      bloomFilter.put(new Slice(bytes, i, i % maxSliceLength + 1));
+    }
+
+    for (int i = 0; i < loop; i++) {
+      Assert.assertTrue(bloomFilter.mightContain(new Slice(bytes, i, i % maxSliceLength + 1)));
+    }
+  }
+
+  @Test
+  public void testBloomFilterForInt()
+  {
+    testBloomFilterForInt(2);
+    testBloomFilterForInt(3);
+    testBloomFilterForInt(5);
+    testBloomFilterForInt(7);
+  }
+
+  public void testBloomFilterForInt(int span)
+  {
+    double expectedFalseProbability = 0.3;
+    SerializationBuffer buffer = SerializationBuffer.READ_BUFFER;
+
+    SliceBloomFilter bloomFilter = new SliceBloomFilter(loop, expectedFalseProbability);
+
+    for (int i = 0; i < loop; i++) {
+      if (i % span == 0) {
+        buffer.writeInt(i);
+        bloomFilter.put(buffer.toSlice());
+      }
+    }
+    buffer.getWindowedBlockStream().releaseAllFreeMemory();
+
+    int falsePositive = 0;
+    for (int i = 0; i < loop; i++) {
+      buffer.writeInt(i);
+      if (!bloomFilter.mightContain(buffer.toSlice())) {
+        Assert.assertTrue(i % span != 0);
+      } else {
+        // BF says its present
+        if (i % 2 != 0) {
+          // But was not there
+          falsePositive++;
+        }
+      }
+    }
+    buffer.getWindowedBlockStream().releaseAllFreeMemory();
+    // Verify false positive prob
+    double falsePositiveProb = falsePositive;
+    falsePositiveProb /= loop;
+    Assert.assertTrue(falsePositiveProb <= expectedFalseProbability);
+
+    for (int i = 0; i < loop; i++) {
+      if (i % span == 0) {
+        buffer.writeInt(i);
+        Assert.assertTrue(bloomFilter.mightContain(buffer.toSlice()));
+      }
+    }
+    buffer.getWindowedBlockStream().releaseAllFreeMemory();
+  }
+
+  private static class FilterOperator extends BaseOperator
+  {
+    private SliceBloomFilter bloomFilter = new SliceBloomFilter(10000, 0.99);
+    private SerializationBuffer buffer = SerializationBuffer.READ_BUFFER;
+
+    public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
+    {
+      @Override
+      public void process(String tuple)
+      {
+        processTuple(tuple);
+      }
+    };
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+    }
+
+    private int count = 0;
+
+    public void processTuple(String tuple)
+    {
+      buffer.writeString(tuple);
+      bloomFilter.mightContain(buffer.toSlice());
+      buffer.reset();
+    }
+  }
+
+  private static class TestInputOperator extends BaseOperator implements InputOperator
+  {
+    public final transient DefaultOutputPort<String> data = new DefaultOutputPort<String>();
+    private int current = 0;
+
+    @Override
+    public void emitTuples()
+    {
+      data.emit("" + current++);
+    }
+  }
+
+  /**
+   * Just test SliceBloomFilter can be used by operator. such as it is serializable etc
+   * @throws Exception
+   */
+  @Test
+  public void testBloomFilterForApplication() throws Exception
+  {
+    Configuration conf = new Configuration(false);
+
+    LocalMode lma = LocalMode.newInstance();
+    DAG dag = lma.getDAG();
+
+    TestInputOperator generator = new TestInputOperator();
+    dag.addOperator("Generator", generator);
+
+    FilterOperator filterOperator = new FilterOperator();
+    dag.addOperator("filterOperator", filterOperator);
+    dag.addStream("Data", generator.data, filterOperator.input).setLocality(Locality.CONTAINER_LOCAL);
+
+    StreamingApplication app = new StreamingApplication()
+    {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+      }
+    };
+
+    lma.prepareDAG(app, conf);
+
+    // Create local cluster
+    final LocalMode.Controller lc = lma.getController();
+    lc.run(3000);
+
+    lc.shutdown();
+  }
+}