PIG-5255: Improvements to bloom join (satishsaley via rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1843689 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 9146244..862d9e6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,8 @@
  
 IMPROVEMENTS
 
+PIG-5255: Improvements to bloom join (satishsaley via rohini)
+
 PIG-5359: Reduce time spent in split serialization (satishsaley via rohini)
 
 PIG-5357: BagFactory interface should support creating a distinct bag from a set (jtolar via rohini)
diff --git a/build.xml b/build.xml
index 2929724..9bb1b12 100644
--- a/build.xml
+++ b/build.xml
@@ -399,6 +399,7 @@
             <include name="joda-time-${joda-time.version}.jar"/>
             <include name="automaton-${automaton.version}.jar"/>
             <include name="jansi-${jansi.version}.jar"/>
+            <include name="RoaringBitmap-shaded-${roaring-bitmap-shaded.version}.jar"/>
         </patternset>
     </fileset>
 
@@ -741,6 +742,7 @@
             <fileset dir="${ivy.lib.dir}" includes="parser-core-${basjes-httpdlog-pigloader.version}.jar"/>
             <fileset dir="${ivy.lib.dir}" includes="ivy-*.jar"/>
             <fileset dir="${ivy.lib.dir}" includes="commons-logging-*.jar"/>
+            <fileset dir="${ivy.lib.dir}" includes="RoaringBitmap-shaded-${roaring-bitmap-shaded.version}.jar"/>
         </copy>
     </target>
 
@@ -1716,6 +1718,10 @@
        <ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" log="${loglevel}"
                  pattern="${ivy.lib.dir.spark}/[artifact]-[revision](-[classifier]).[ext]" conf="spark${sparkversion},hbase${hbaseversion}"/>
        <ivy:cachepath pathid="compile.classpath" conf="compile"/>
+       <exec dir="${basedir}/shade/roaringbitmap" executable="mvn">
+          <arg line="clean package -Droaring.bitmap.version=${roaring-bitmap-shaded.version}"/>
+        </exec>
+       <copy file="${basedir}/shade/roaringbitmap/target/RoaringBitmap-shaded-${roaring-bitmap-shaded.version}.jar" todir="${ivy.lib.dir}"/>
      </target>
 
      <target name="ivy-test" depends="ivy-resolve" description="Retrieve Ivy-managed artifacts for test configuration">
diff --git a/ivy.xml b/ivy.xml
index 0902b18..a93da2a 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -235,6 +235,9 @@
     <dependency org="org.jruby" name="jruby-complete" rev="${jruby.version}" conf="compile->master"/>
     <dependency org="asm" name="asm" rev="${asm.version}" conf="compile->default"/>
 
+    <!-- Dependencies for bloom join -->
+    <dependency org="org.roaringbitmap" name="RoaringBitmap" rev="${roaring-bitmap-shaded.version}" conf="compile->master"/>
+
     <!-- HBase dependency in format for releases higher or equal to 0.95 -->
     <dependency org="org.apache.hbase" name="hbase-client" rev="${hbase1.version}" conf="hbase1->master">
       <artifact name="hbase-client" type="jar"/>
diff --git a/ivy/libraries.properties b/ivy/libraries.properties
index ec71472..1abc967 100644
--- a/ivy/libraries.properties
+++ b/ivy/libraries.properties
@@ -96,4 +96,5 @@
 htrace4.version=4.0.1-incubating
 commons-lang3.version=3.6
 scala-xml.version=1.0.5
-glassfish.el.version=3.0.1-b08
\ No newline at end of file
+glassfish.el.version=3.0.1-b08
+roaring-bitmap-shaded.version=0.7.14
\ No newline at end of file
diff --git a/shade/roaringbitmap/pom.xml b/shade/roaringbitmap/pom.xml
new file mode 100644
index 0000000..a7ee182
--- /dev/null
+++ b/shade/roaringbitmap/pom.xml
@@ -0,0 +1,80 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache</groupId>
+        <artifactId>apache</artifactId>
+        <version>14</version>
+    </parent>
+
+    <groupId>org.apache.pig</groupId>
+    <artifactId>RoaringBitmap-shaded</artifactId>
+    <packaging>jar</packaging>
+    <version>${roaring.bitmap.version}</version>
+
+    <name>Pig RoaringBitmap</name>
+    <url>http://pig.apache.org</url>
+    <prerequisites>
+        <maven>3.0</maven>
+    </prerequisites>
+
+    <modules>
+    </modules>
+
+    <properties>
+        <maven.shade.plugin.version>2.4.3</maven.shade.plugin.version>
+        <roaring.bitmap.version>0.7.14</roaring.bitmap.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.roaringbitmap</groupId>
+            <artifactId>RoaringBitmap</artifactId>
+            <version>${roaring.bitmap.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${maven.shade.plugin.version}</version>
+                <executions>
+                    <execution>
+                        <id>shade-asm</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <relocations>
+                                <relocation>
+                                    <pattern>org.roaringbitmap</pattern>
+                                    <shadedPattern>org.apache.pig.org.roaringbitmap</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
index 1d6f784..e64e08f 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
@@ -22,17 +22,16 @@
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.apache.hadoop.util.bloom.BloomFilter;
 import org.apache.hadoop.util.bloom.Key;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
-import org.apache.pig.builtin.BuildBloomBase;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.bloom.BloomFilter;
 
 public class BloomPackager extends Packager {
 
@@ -103,10 +102,10 @@
         Iterator<Tuple> iter = bags[0].iterator();
         Tuple tup = iter.next();
         DataByteArray bloomBytes = (DataByteArray) tup.get(0);
-        BloomFilter bloomFilter = BuildBloomBase.bloomIn(bloomBytes);
+        BloomFilter bloomFilter = BloomFilter.bloomIn(bloomBytes);
         while (iter.hasNext()) {
             tup = iter.next();
-            bloomFilter.or(BuildBloomBase.bloomIn((DataByteArray) tup.get(0)));
+            bloomFilter.or(BloomFilter.bloomIn((DataByteArray) tup.get(0)));
         }
 
         Object partition = key;
@@ -160,4 +159,4 @@
     public boolean isBloomCreatedInMap() {
         return bloomCreatedInMap;
     }
-}
\ No newline at end of file
+}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java
index 82b599d..ebedc69 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java
@@ -24,7 +24,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.bloom.BloomFilter;
 import org.apache.hadoop.util.bloom.Key;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
@@ -32,12 +31,12 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
-import org.apache.pig.builtin.BuildBloomBase;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.bloom.BloomFilter;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -106,7 +105,7 @@
                 }
                 Tuple val = (Tuple) reader.getCurrentValue();
                 int index = (int) val.get(0);
-                bloomFilters[index] = BuildBloomBase.bloomIn((DataByteArray) val.get(1));
+                bloomFilters[index] = BloomFilter.bloomIn((DataByteArray) val.get(1));
             }
             ObjectCache.getInstance().cache(cacheKey, bloomFilters);
         } catch (Exception e) {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
index 4045942..545f136 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
@@ -25,7 +25,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.bloom.BloomFilter;
 import org.apache.hadoop.util.bloom.Key;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -37,6 +36,7 @@
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.bloom.BloomFilter;
 import org.apache.pig.impl.io.NullableBytesWritable;
 import org.apache.pig.impl.io.NullableIntWritable;
 import org.apache.pig.impl.io.NullableTuple;
diff --git a/src/org/apache/pig/impl/bloom/BloomFilter.java b/src/org/apache/pig/impl/bloom/BloomFilter.java
new file mode 100644
index 0000000..1487d1e
--- /dev/null
+++ b/src/org/apache/pig/impl/bloom/BloomFilter.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.bloom;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.util.bloom.Filter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.org.roaringbitmap.RoaringBitmap;
+
+public class BloomFilter extends Filter {
+    private static final Log LOG = LogFactory.getLog(BloomFilter.class);
+    private static final int VERSION = 1;
+    private HashFunction hash;
+    private RoaringBitmap bitmap;
+    private int hashAlgorithm;
+
+    /**
+     * Read the fields using bloomIn
+     */
+    public BloomFilter() {
+        super();
+    }
+
+    public BloomFilter(int vectorSize, int nbHash, int hashAlgorithm) {
+        super.vectorSize = vectorSize;
+        super.nbHash = nbHash;
+        this.hashAlgorithm = hashAlgorithm;
+        this.hash = new HashFunction(vectorSize, nbHash, hashAlgorithm);
+        this.bitmap = new RoaringBitmap();
+    }
+
+    @Override
+    public void add(Key key) {
+        if(key == null) {
+            throw new NullPointerException("key cannot be null");
+        }
+        int[] h = hash.hash(key);
+        hash.clear();
+        Arrays.sort(h);
+        this.bitmap.or(RoaringBitmap.bitmapOf(h));
+    }
+
+    @Override
+    public boolean membershipTest(Key key) {
+        if(key == null) {
+            throw new NullPointerException("key cannot be null");
+        }
+        int[] h = hash.hash(key);
+        hash.clear();
+        for(int i = 0; i < nbHash; i++) {
+            if(!this.bitmap.contains(h[i])) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public void and(Filter filter) {
+        if(filter == null
+                || !(filter instanceof BloomFilter)
+                || ((BloomFilter)filter).vectorSize != this.vectorSize
+                || ((BloomFilter)filter).nbHash != this.nbHash) {
+              throw new IllegalArgumentException("filters cannot be and-ed");
+        }
+        this.bitmap.and(((BloomFilter)filter).bitmap);
+    }
+
+    @Override
+    public void or(Filter filter) {
+        if(filter == null
+                || !(filter instanceof BloomFilter)
+                || ((BloomFilter)filter).vectorSize != this.vectorSize
+                || ((BloomFilter)filter).nbHash != this.nbHash) {
+              throw new IllegalArgumentException("filters cannot be or-ed");
+        }
+        this.bitmap.or(((BloomFilter)filter).bitmap);
+    }
+
+    @Override
+    public void xor(Filter filter) {
+        if(filter == null
+                || !(filter instanceof BloomFilter)
+                || ((BloomFilter)filter).vectorSize != this.vectorSize
+                || ((BloomFilter)filter).nbHash != this.nbHash) {
+              throw new IllegalArgumentException("filters cannot be xor-ed");
+        }
+        this.bitmap.xor(((BloomFilter)filter).bitmap);
+    }
+
+    @Override
+    public void not() {
+        this.bitmap.flip(0, vectorSize);
+    }
+
+    @Override
+    public String toString() {
+      return this.bitmap.toString();
+    }
+
+    /**
+     * @return size of the the bloomfilter
+     */
+    public int getVectorSize() {
+      return this.vectorSize;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(VERSION);
+        out.writeInt(this.nbHash);
+        out.writeByte(this.hashAlgorithm);
+        out.writeInt(this.vectorSize);
+        this.bitmap.runOptimize();
+        ByteArrayOutputStream bos = compressBitmap();
+        LOG.info("Compressed bitmap from " + String.format("%,8d bytes", this.bitmap.getSizeInBytes())
+        + " to "+ String.format("%,8d bytes", bos.size()));
+        out.writeInt(bos.size());
+        out.write(bos.toByteArray());
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        int ver = in.readInt();
+        if (ver == VERSION) {
+            this.nbHash = in.readInt();
+            this.hashAlgorithm = in.readByte();
+        } else {
+            throw new IOException("Unsupported version: " + ver);
+        }
+        this.vectorSize = in.readInt();
+        this.hash = new HashFunction(this.vectorSize, this.nbHash, this.hashAlgorithm);
+        this.bitmap = new RoaringBitmap();
+        int compressedSize = in.readInt();
+        byte[] buf = new byte[compressedSize];
+        in.readFully(buf);
+        this.bitmap.deserialize(decompressBitmap(buf));
+    }
+
+    public static BloomFilter bloomIn(DataByteArray b) throws IOException {
+        DataInputStream dis = new DataInputStream(new
+            ByteArrayInputStream(b.get()));
+        BloomFilter f = new BloomFilter();
+        f.readFields(dis);
+        return f;
+    }
+    private ByteArrayOutputStream compressBitmap() throws IOException {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        BZip2Codec bzip = new BZip2Codec();
+        bzip.setConf(new Configuration(false));
+        CompressionOutputStream compressionOut = bzip.createOutputStream(bos);
+        DataOutputStream dos = new DataOutputStream(compressionOut);
+        this.bitmap.serialize(dos);
+        compressionOut.finish();
+        dos.flush();
+        return bos;
+    }
+
+    private DataInput decompressBitmap(byte[] buffer) throws IOException {
+        ByteArrayInputStream deCompressedDataBuffer = new ByteArrayInputStream(buffer, 0, buffer.length);
+        BZip2Codec bzip = new BZip2Codec();
+        bzip.setConf(new Configuration(false));
+        CompressionInputStream compressionIn = bzip.createInputStream(deCompressedDataBuffer);
+        DataInputStream inflateIn = new DataInputStream(compressionIn);
+        return inflateIn;
+    }
+}
diff --git a/src/org/apache/pig/impl/bloom/Hash.java b/src/org/apache/pig/impl/bloom/Hash.java
new file mode 100644
index 0000000..180c1dc
--- /dev/null
+++ b/src/org/apache/pig/impl/bloom/Hash.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.bloom;
+
+public abstract class Hash {
+    public static final int MURMUR = 1;
+    public static final int MURMUR3 = MURMUR;
+    public static final int MURMUR3KM = 2;
+    public static final int JENKINS = 3;
+
+    public static int parseHashType(String hashType) {
+        if("murmur".equalsIgnoreCase(hashType)) {
+            return MURMUR3;
+        } else if("murmur3km".equalsIgnoreCase(hashType)) {
+            return MURMUR3KM;
+        } else if("jenkins".equalsIgnoreCase(hashType)) {
+            return JENKINS;
+        }
+        throw new IllegalArgumentException("Hash Algorithm values must be one of - murmur, murmur3km, jenkins");
+    }
+
+    public static Hash getInstance(int hashType) {
+        switch (hashType) {
+            case MURMUR:
+                return new Murmur3Hash();
+            case MURMUR3KM:
+                return new KirschMitzenmacherHash();
+            case JENKINS:
+                return new JenkinsHash();
+        }
+        throw new IllegalArgumentException("Hash type values must be one of - 1 (murmur), 2 (murmur3km), 3 (jenkins)");
+    }
+
+    /**
+     * @param bytes
+     * @param maxValue The maximum hashed value
+     * @param nbHash The number of hashed values
+     * @return
+     */
+    public abstract int[] hash(byte[] bytes, int maxValue, int nbHash);
+
+}
diff --git a/src/org/apache/pig/impl/bloom/HashFunction.java b/src/org/apache/pig/impl/bloom/HashFunction.java
new file mode 100644
index 0000000..95cde97
--- /dev/null
+++ b/src/org/apache/pig/impl/bloom/HashFunction.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.bloom;
+
+import org.apache.hadoop.util.bloom.Key;
+
+public class HashFunction {
+      /** The number of hashed values. */
+      protected int nbHash;
+      /** The maximum highest returned value. */
+      protected int maxValue;
+      /** Hashing algorithm to use. */
+      protected Hash hashAlgorithm;
+      /**
+       * Constructor.
+       * <p>
+       * Builds a hash function that must obey to a given maximum number of returned values and a highest value.
+       * @param maxValue The maximum highest returned value.
+       * @param nbHash The number of resulting hashed values.
+       * @param hashAlgorithm type of the hashing algorithm (see {@link Hash}).
+       */
+      public HashFunction(int maxValue, int nbHash, int hashAlgorithm) {
+          if (maxValue <= 0) {
+              throw new IllegalArgumentException("maxValue must be > 0");
+          }
+          if (nbHash <= 0) {
+              throw new IllegalArgumentException("nbHash must be > 0");
+          }
+
+          this.maxValue = maxValue;
+          this.nbHash = nbHash;
+          this.hashAlgorithm = Hash.getInstance(hashAlgorithm);
+      }
+
+      /** Clears <i>this</i> hash function. A NOOP */
+      public void clear() {
+      }
+
+      /**
+       * Hashes a specified key into several integers.
+       * @param k The specified key.
+       * @return The array of hashed values.
+       */
+      public int[] hash(Key k){
+          byte[] b = k.getBytes();
+          if (b == null) {
+              throw new NullPointerException("buffer reference is null");
+          }
+          if (b.length == 0) {
+              throw new IllegalArgumentException("key length must be > 0");
+          }
+          return hashAlgorithm.hash(b, maxValue, nbHash);
+       }
+}
\ No newline at end of file
diff --git a/src/org/apache/pig/impl/bloom/HashProvider.java b/src/org/apache/pig/impl/bloom/HashProvider.java
new file mode 100644
index 0000000..48a811b
--- /dev/null
+++ b/src/org/apache/pig/impl/bloom/HashProvider.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/*
+ * Copyright Felix Gessert and Florian Bücklers. All rights reserved. Permission is hereby granted,
+ * free of charge, to any person obtaining a copy of this software and associated documentation files
+ * (the "Software"), to deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
+ * Software, and to permit persons to whom the Software is furnished to do so, subject to the following
+ * conditions:
+ * The above copyright notice and this permission notice shall be included in all copies or substantial
+ * portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
+ * NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+ * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+package org.apache.pig.impl.bloom;
+
+import java.util.function.BiFunction;
+
+/**
+ * Taken from https://github.com/Baqend/Orestes-Bloomfilter
+ */
+public class HashProvider {
+
+    public static int[] hashMurmur3(byte[] value, int m, int k) {
+        return rejectionSample(HashProvider::murmur3_signed, value, m, k);
+    }
+
+    public static int[] hashKirschMitzenmacher(byte[] value, int m, int k) {
+        int[] result = new int[k];
+        long hash1 = murmur3(0, value);
+        long hash2 = murmur3((int) hash1, value);
+        for (int i = 0; i < k; i++) {
+            result[i] = (int) ((hash1 + i * hash2) % m);
+        }
+        return result;
+    }
+
+    private static long murmur3(int seed, byte[] bytes) {
+        return Integer.toUnsignedLong(murmur3_signed(seed, bytes));
+    }
+
+    private static int murmur3_signed(int seed, byte[] bytes) {
+        int h1 = seed;
+        //Standard in Guava
+        int c1 = 0xcc9e2d51;
+        int c2 = 0x1b873593;
+        int len = bytes.length;
+        int i = 0;
+
+        while (len >= 4) {
+            //process()
+            int k1  = (bytes[i++] & 0xFF);
+                k1 |= (bytes[i++] & 0xFF) << 8;
+                k1 |= (bytes[i++] & 0xFF) << 16;
+                k1 |= (bytes[i++] & 0xFF) << 24;
+
+            k1 *= c1;
+            k1 = Integer.rotateLeft(k1, 15);
+            k1 *= c2;
+
+            h1 ^= k1;
+            h1 = Integer.rotateLeft(h1, 13);
+            h1 = h1 * 5 + 0xe6546b64;
+
+            len -= 4;
+        }
+
+        //processingRemaining()
+        int k1 = 0;
+        switch (len) {
+            case 3:
+                k1 ^= (bytes[i + 2] & 0xFF) << 16;
+                // fall through
+            case 2:
+                k1 ^= (bytes[i + 1] & 0xFF) << 8;
+                // fall through
+            case 1:
+                k1 ^= (bytes[i] & 0xFF);
+
+                k1 *= c1;
+                k1 = Integer.rotateLeft(k1, 15);
+                k1 *= c2;
+                h1 ^= k1;
+        }
+        i += len;
+
+        //makeHash()
+        h1 ^= i;
+
+        h1 ^= h1 >>> 16;
+        h1 *= 0x85ebca6b;
+        h1 ^= h1 >>> 13;
+        h1 *= 0xc2b2ae35;
+        h1 ^= h1 >>> 16;
+
+        return h1;
+    }
+
+    /**
+     * Performs rejection sampling on a random 32bit Java int (sampled from Integer.MIN_VALUE to Integer.MAX_VALUE).
+     *
+     * @param random int
+     * @param m     integer output range [1,size]
+     * @return the number down-sampled to interval [0, size]. Or -1 if it has to be rejected.
+     */
+    private static int rejectionSample(int random, int m) {
+        random = Math.abs(random);
+        if (random > (2147483647 - 2147483647 % m)
+                || random == Integer.MIN_VALUE)
+            return -1;
+        else
+            return random % m;
+    }
+
+    private static int[] rejectionSample(BiFunction<Integer, byte[], Integer> hashFunction, byte[] value, int m, int k) {
+        int[] hashes = new int[k];
+        int seed = 0;
+        int pos = 0;
+        while (pos < k) {
+            seed = hashFunction.apply(seed, value);
+            int hash = rejectionSample(seed, m);
+            if (hash != -1) {
+                hashes[pos++] = hash;
+            }
+        }
+        return hashes;
+    }
+
+}
diff --git a/src/org/apache/pig/impl/bloom/JenkinsHash.java b/src/org/apache/pig/impl/bloom/JenkinsHash.java
new file mode 100644
index 0000000..5c03741
--- /dev/null
+++ b/src/org/apache/pig/impl/bloom/JenkinsHash.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.bloom;
+
+public class JenkinsHash extends Hash {
+
+    private org.apache.hadoop.util.hash.Hash hashFunction = org.apache.hadoop.util.hash.JenkinsHash.getInstance();
+
+    @Override
+    public int[] hash(byte[] bytes, int maxValue, int nbHash) {
+        int[] result = new int[nbHash];
+        for (int i = 0, initval = 0; i < nbHash; i++) {
+        initval = hashFunction.hash(bytes, initval);
+        result[i] = Math.abs(initval % maxValue);
+        }
+        return result;
+    }
+
+}
diff --git a/src/org/apache/pig/impl/bloom/KirschMitzenmacherHash.java b/src/org/apache/pig/impl/bloom/KirschMitzenmacherHash.java
new file mode 100644
index 0000000..dcb1f3f
--- /dev/null
+++ b/src/org/apache/pig/impl/bloom/KirschMitzenmacherHash.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.bloom;
+
+public class KirschMitzenmacherHash extends Hash {
+
+    @Override
+    public int[] hash(byte[] bytes, int maxValue, int numHash) {
+        return HashProvider.hashKirschMitzenmacher(bytes, maxValue , numHash);
+    }
+
+}
diff --git a/src/org/apache/pig/impl/bloom/Murmur3Hash.java b/src/org/apache/pig/impl/bloom/Murmur3Hash.java
new file mode 100644
index 0000000..e44ca75
--- /dev/null
+++ b/src/org/apache/pig/impl/bloom/Murmur3Hash.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.bloom;
+
+public class Murmur3Hash extends Hash {
+    @Override
+    public int[] hash(byte[] bytes, int maxValue, int numHash) {
+        return HashProvider.hashMurmur3(bytes, maxValue, numHash);
+    }
+}
diff --git a/src/org/apache/pig/impl/util/JarManager.java b/src/org/apache/pig/impl/util/JarManager.java
index e6c9215..0a63658 100644
--- a/src/org/apache/pig/impl/util/JarManager.java
+++ b/src/org/apache/pig/impl/util/JarManager.java
@@ -48,6 +48,7 @@
 import org.apache.hadoop.util.StringUtils;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.org.roaringbitmap.RoaringBitmap;
 import org.apache.tools.bzip2r.BZip2Constants;
 import org.joda.time.DateTime;
 
@@ -66,7 +67,8 @@
         BZIP2R(BZip2Constants.class),
         AUTOMATON(Automaton.class),
         ANTLR(CommonTokenStream.class),
-        JODATIME(DateTime.class);
+        JODATIME(DateTime.class),
+        SHADED_ROARING_BITMAP(RoaringBitmap.class);
 
         private final Class pkgClass;
 
diff --git a/test/org/apache/pig/test/TestJobControlCompiler.java b/test/org/apache/pig/test/TestJobControlCompiler.java
index 2c39964..707db47 100644
--- a/test/org/apache/pig/test/TestJobControlCompiler.java
+++ b/test/org/apache/pig/test/TestJobControlCompiler.java
@@ -130,7 +130,7 @@
     // verifying the jar gets on distributed cache
     Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf);
     // guava jar is not shipped with Hadoop 2.x
-    Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 5, fileClassPaths.length);
+    Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 6, fileClassPaths.length);
     Path distributedCachePath = fileClassPaths[0];
     Assert.assertEquals("ends with jar name: "+distributedCachePath, distributedCachePath.getName(), tmpFile.getName());
     // hadoop bug requires path to not contain hdfs://hotname in front
@@ -234,11 +234,11 @@
           // 4. another.jar and 5. udf1.jar, and not duplicate udf.jar
           System.out.println("cache.files= " + Arrays.toString(cacheURIs));
           System.out.println("classpath.files= " + Arrays.toString(fileClassPaths));
-          // Default jars - 5 (pig, antlr, joda-time, automaton)
+          // Default jars - 5 (pig, antlr, joda-time, automaton, roaring-bitmap-shaded)
           // Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar
-          Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9,
+          Assert.assertEquals("size 10 for " + Arrays.toString(cacheURIs), 10,
                   Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
-          Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 9,
+          Assert.assertEquals("size 10 for " + Arrays.toString(fileClassPaths), 10,
                   Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
           // Count occurrences of the resources
           Map<String, Integer> occurrences = new HashMap<String, Integer>();
@@ -248,7 +248,7 @@
               val = (val == null) ? 1 : ++val;
               occurrences.put(cacheURI.toString(), val);
           }
-          Assert.assertEquals(9, occurrences.size());
+          Assert.assertEquals(10, occurrences.size());
 
           for (String file : occurrences.keySet()) {
               // check that only single occurrence even though we added once to dist cache (simulating via Oozie)