PIG-5255: Improvements to bloom join (satishsaley via rohini)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1843689 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 9146244..862d9e6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,8 @@
IMPROVEMENTS
+PIG-5255: Improvements to bloom join (satishsaley via rohini)
+
PIG-5359: Reduce time spent in split serialization (satishsaley via rohini)
PIG-5357: BagFactory interface should support creating a distinct bag from a set (jtolar via rohini)
diff --git a/build.xml b/build.xml
index 2929724..9bb1b12 100644
--- a/build.xml
+++ b/build.xml
@@ -399,6 +399,7 @@
<include name="joda-time-${joda-time.version}.jar"/>
<include name="automaton-${automaton.version}.jar"/>
<include name="jansi-${jansi.version}.jar"/>
+ <include name="RoaringBitmap-shaded-${roaring-bitmap-shaded.version}.jar"/>
</patternset>
</fileset>
@@ -741,6 +742,7 @@
<fileset dir="${ivy.lib.dir}" includes="parser-core-${basjes-httpdlog-pigloader.version}.jar"/>
<fileset dir="${ivy.lib.dir}" includes="ivy-*.jar"/>
<fileset dir="${ivy.lib.dir}" includes="commons-logging-*.jar"/>
+ <fileset dir="${ivy.lib.dir}" includes="RoaringBitmap-shaded-${roaring-bitmap-shaded.version}.jar"/>
</copy>
</target>
@@ -1716,6 +1718,10 @@
<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" log="${loglevel}"
pattern="${ivy.lib.dir.spark}/[artifact]-[revision](-[classifier]).[ext]" conf="spark${sparkversion},hbase${hbaseversion}"/>
<ivy:cachepath pathid="compile.classpath" conf="compile"/>
+ <exec dir="${basedir}/shade/roaringbitmap" executable="mvn">
+ <arg line="clean package -Droaring.bitmap.version=${roaring-bitmap-shaded.version}"/>
+ </exec>
+ <copy file="${basedir}/shade/roaringbitmap/target/RoaringBitmap-shaded-${roaring-bitmap-shaded.version}.jar" todir="${ivy.lib.dir}"/>
</target>
<target name="ivy-test" depends="ivy-resolve" description="Retrieve Ivy-managed artifacts for test configuration">
diff --git a/ivy.xml b/ivy.xml
index 0902b18..a93da2a 100644
--- a/ivy.xml
+++ b/ivy.xml
@@ -235,6 +235,9 @@
<dependency org="org.jruby" name="jruby-complete" rev="${jruby.version}" conf="compile->master"/>
<dependency org="asm" name="asm" rev="${asm.version}" conf="compile->default"/>
+ <!-- Dependencies for bloom join -->
+ <dependency org="org.roaringbitmap" name="RoaringBitmap" rev="${roaring-bitmap-shaded.version}" conf="compile->master"/>
+
<!-- HBase dependency in format for releases higher or equal to 0.95 -->
<dependency org="org.apache.hbase" name="hbase-client" rev="${hbase1.version}" conf="hbase1->master">
<artifact name="hbase-client" type="jar"/>
diff --git a/ivy/libraries.properties b/ivy/libraries.properties
index ec71472..1abc967 100644
--- a/ivy/libraries.properties
+++ b/ivy/libraries.properties
@@ -96,4 +96,5 @@
htrace4.version=4.0.1-incubating
commons-lang3.version=3.6
scala-xml.version=1.0.5
-glassfish.el.version=3.0.1-b08
\ No newline at end of file
+glassfish.el.version=3.0.1-b08
+roaring-bitmap-shaded.version=0.7.14
\ No newline at end of file
diff --git a/shade/roaringbitmap/pom.xml b/shade/roaringbitmap/pom.xml
new file mode 100644
index 0000000..a7ee182
--- /dev/null
+++ b/shade/roaringbitmap/pom.xml
@@ -0,0 +1,80 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache</groupId>
+ <artifactId>apache</artifactId>
+ <version>14</version>
+ </parent>
+
+ <groupId>org.apache.pig</groupId>
+ <artifactId>RoaringBitmap-shaded</artifactId>
+ <packaging>jar</packaging>
+ <version>${roaring.bitmap.version}</version>
+
+ <name>Pig RoaringBitmap</name>
+ <url>http://pig.apache.org</url>
+ <prerequisites>
+ <maven>3.0</maven>
+ </prerequisites>
+
+ <modules>
+ </modules>
+
+ <properties>
+ <maven.shade.plugin.version>2.4.3</maven.shade.plugin.version>
+ <roaring.bitmap.version>0.7.14</roaring.bitmap.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.roaringbitmap</groupId>
+ <artifactId>RoaringBitmap</artifactId>
+ <version>${roaring.bitmap.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>${maven.shade.plugin.version}</version>
+ <executions>
+ <execution>
+ <id>shade-asm</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>org.roaringbitmap</pattern>
+ <shadedPattern>org.apache.pig.org.roaringbitmap</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
index 1d6f784..e64e08f 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java
@@ -22,17 +22,16 @@
import java.io.IOException;
import java.util.Iterator;
-import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
-import org.apache.pig.builtin.BuildBloomBase;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.bloom.BloomFilter;
public class BloomPackager extends Packager {
@@ -103,10 +102,10 @@
Iterator<Tuple> iter = bags[0].iterator();
Tuple tup = iter.next();
DataByteArray bloomBytes = (DataByteArray) tup.get(0);
- BloomFilter bloomFilter = BuildBloomBase.bloomIn(bloomBytes);
+ BloomFilter bloomFilter = BloomFilter.bloomIn(bloomBytes);
while (iter.hasNext()) {
tup = iter.next();
- bloomFilter.or(BuildBloomBase.bloomIn((DataByteArray) tup.get(0)));
+ bloomFilter.or(BloomFilter.bloomIn((DataByteArray) tup.get(0)));
}
Object partition = key;
@@ -160,4 +159,4 @@
public boolean isBloomCreatedInMap() {
return bloomCreatedInMap;
}
-}
\ No newline at end of file
+}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java
index 82b599d..ebedc69 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java
@@ -24,7 +24,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
@@ -32,12 +31,12 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
-import org.apache.pig.builtin.BuildBloomBase;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.bloom.BloomFilter;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.tez.runtime.api.LogicalInput;
@@ -106,7 +105,7 @@
}
Tuple val = (Tuple) reader.getCurrentValue();
int index = (int) val.get(0);
- bloomFilters[index] = BuildBloomBase.bloomIn((DataByteArray) val.get(1));
+ bloomFilters[index] = BloomFilter.bloomIn((DataByteArray) val.get(1));
}
ObjectCache.getInstance().cache(cacheKey, bloomFilters);
} catch (Exception e) {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
index 4045942..545f136 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java
@@ -25,7 +25,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
@@ -37,6 +36,7 @@
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.bloom.BloomFilter;
import org.apache.pig.impl.io.NullableBytesWritable;
import org.apache.pig.impl.io.NullableIntWritable;
import org.apache.pig.impl.io.NullableTuple;
diff --git a/src/org/apache/pig/impl/bloom/BloomFilter.java b/src/org/apache/pig/impl/bloom/BloomFilter.java
new file mode 100644
index 0000000..1487d1e
--- /dev/null
+++ b/src/org/apache/pig/impl/bloom/BloomFilter.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.bloom;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.BZip2Codec;
+import org.apache.hadoop.io.compress.CompressionInputStream;
+import org.apache.hadoop.io.compress.CompressionOutputStream;
+import org.apache.hadoop.util.bloom.Filter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.org.roaringbitmap.RoaringBitmap;
+
+public class BloomFilter extends Filter {
+ private static final Log LOG = LogFactory.getLog(BloomFilter.class);
+ private static final int VERSION = 1;
+ private HashFunction hash;
+ private RoaringBitmap bitmap;
+ private int hashAlgorithm;
+
+ /**
+ * Read the fields using bloomIn
+ */
+ public BloomFilter() {
+ super();
+ }
+
+ public BloomFilter(int vectorSize, int nbHash, int hashAlgorithm) {
+ super.vectorSize = vectorSize;
+ super.nbHash = nbHash;
+ this.hashAlgorithm = hashAlgorithm;
+ this.hash = new HashFunction(vectorSize, nbHash, hashAlgorithm);
+ this.bitmap = new RoaringBitmap();
+ }
+
+ @Override
+ public void add(Key key) {
+ if(key == null) {
+ throw new NullPointerException("key cannot be null");
+ }
+ int[] h = hash.hash(key);
+ hash.clear();
+ Arrays.sort(h);
+ this.bitmap.or(RoaringBitmap.bitmapOf(h));
+ }
+
+ @Override
+ public boolean membershipTest(Key key) {
+ if(key == null) {
+ throw new NullPointerException("key cannot be null");
+ }
+ int[] h = hash.hash(key);
+ hash.clear();
+ for(int i = 0; i < nbHash; i++) {
+ if(!this.bitmap.contains(h[i])) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void and(Filter filter) {
+ if(filter == null
+ || !(filter instanceof BloomFilter)
+ || ((BloomFilter)filter).vectorSize != this.vectorSize
+ || ((BloomFilter)filter).nbHash != this.nbHash) {
+ throw new IllegalArgumentException("filters cannot be and-ed");
+ }
+ this.bitmap.and(((BloomFilter)filter).bitmap);
+ }
+
+ @Override
+ public void or(Filter filter) {
+ if(filter == null
+ || !(filter instanceof BloomFilter)
+ || ((BloomFilter)filter).vectorSize != this.vectorSize
+ || ((BloomFilter)filter).nbHash != this.nbHash) {
+ throw new IllegalArgumentException("filters cannot be or-ed");
+ }
+ this.bitmap.or(((BloomFilter)filter).bitmap);
+ }
+
+ @Override
+ public void xor(Filter filter) {
+ if(filter == null
+ || !(filter instanceof BloomFilter)
+ || ((BloomFilter)filter).vectorSize != this.vectorSize
+ || ((BloomFilter)filter).nbHash != this.nbHash) {
+ throw new IllegalArgumentException("filters cannot be xor-ed");
+ }
+ this.bitmap.xor(((BloomFilter)filter).bitmap);
+ }
+
+ @Override
+ public void not() {
+ this.bitmap.flip(0, vectorSize);
+ }
+
+ @Override
+ public String toString() {
+ return this.bitmap.toString();
+ }
+
+ /**
+ * @return size of the the bloomfilter
+ */
+ public int getVectorSize() {
+ return this.vectorSize;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(VERSION);
+ out.writeInt(this.nbHash);
+ out.writeByte(this.hashAlgorithm);
+ out.writeInt(this.vectorSize);
+ this.bitmap.runOptimize();
+ ByteArrayOutputStream bos = compressBitmap();
+ LOG.info("Compressed bitmap from " + String.format("%,8d bytes", this.bitmap.getSizeInBytes())
+ + " to "+ String.format("%,8d bytes", bos.size()));
+ out.writeInt(bos.size());
+ out.write(bos.toByteArray());
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ int ver = in.readInt();
+ if (ver == VERSION) {
+ this.nbHash = in.readInt();
+ this.hashAlgorithm = in.readByte();
+ } else {
+ throw new IOException("Unsupported version: " + ver);
+ }
+ this.vectorSize = in.readInt();
+ this.hash = new HashFunction(this.vectorSize, this.nbHash, this.hashAlgorithm);
+ this.bitmap = new RoaringBitmap();
+ int compressedSize = in.readInt();
+ byte[] buf = new byte[compressedSize];
+ in.readFully(buf);
+ this.bitmap.deserialize(decompressBitmap(buf));
+ }
+
+ public static BloomFilter bloomIn(DataByteArray b) throws IOException {
+ DataInputStream dis = new DataInputStream(new
+ ByteArrayInputStream(b.get()));
+ BloomFilter f = new BloomFilter();
+ f.readFields(dis);
+ return f;
+ }
+ private ByteArrayOutputStream compressBitmap() throws IOException {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ BZip2Codec bzip = new BZip2Codec();
+ bzip.setConf(new Configuration(false));
+ CompressionOutputStream compressionOut = bzip.createOutputStream(bos);
+ DataOutputStream dos = new DataOutputStream(compressionOut);
+ this.bitmap.serialize(dos);
+ compressionOut.finish();
+ dos.flush();
+ return bos;
+ }
+
+ private DataInput decompressBitmap(byte[] buffer) throws IOException {
+ ByteArrayInputStream deCompressedDataBuffer = new ByteArrayInputStream(buffer, 0, buffer.length);
+ BZip2Codec bzip = new BZip2Codec();
+ bzip.setConf(new Configuration(false));
+ CompressionInputStream compressionIn = bzip.createInputStream(deCompressedDataBuffer);
+ DataInputStream inflateIn = new DataInputStream(compressionIn);
+ return inflateIn;
+ }
+}
diff --git a/src/org/apache/pig/impl/bloom/Hash.java b/src/org/apache/pig/impl/bloom/Hash.java
new file mode 100644
index 0000000..180c1dc
--- /dev/null
+++ b/src/org/apache/pig/impl/bloom/Hash.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.bloom;
+
+public abstract class Hash {
+ public static final int MURMUR = 1;
+ public static final int MURMUR3 = MURMUR;
+ public static final int MURMUR3KM = 2;
+ public static final int JENKINS = 3;
+
+ public static int parseHashType(String hashType) {
+ if("murmur".equalsIgnoreCase(hashType)) {
+ return MURMUR3;
+ } else if("murmur3km".equalsIgnoreCase(hashType)) {
+ return MURMUR3KM;
+ } else if("jenkins".equalsIgnoreCase(hashType)) {
+ return JENKINS;
+ }
+ throw new IllegalArgumentException("Hash Algorithm values must be one of - murmur, murmur3km, jenkins");
+ }
+
+ public static Hash getInstance(int hashType) {
+ switch (hashType) {
+ case MURMUR:
+ return new Murmur3Hash();
+ case MURMUR3KM:
+ return new KirschMitzenmacherHash();
+ case JENKINS:
+ return new JenkinsHash();
+ }
+ throw new IllegalArgumentException("Hash type values must be one of - 1 (murmur), 2 (murmur3km), 3 (jenkins)");
+ }
+
+ /**
+ * @param bytes
+ * @param maxValue The maximum hashed value
+ * @param nbHash The number of hashed values
+ * @return
+ */
+ public abstract int[] hash(byte[] bytes, int maxValue, int nbHash);
+
+}
diff --git a/src/org/apache/pig/impl/bloom/HashFunction.java b/src/org/apache/pig/impl/bloom/HashFunction.java
new file mode 100644
index 0000000..95cde97
--- /dev/null
+++ b/src/org/apache/pig/impl/bloom/HashFunction.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.bloom;
+
+import org.apache.hadoop.util.bloom.Key;
+
+public class HashFunction {
+ /** The number of hashed values. */
+ protected int nbHash;
+ /** The maximum highest returned value. */
+ protected int maxValue;
+ /** Hashing algorithm to use. */
+ protected Hash hashAlgorithm;
+ /**
+ * Constructor.
+ * <p>
+ * Builds a hash function that must obey to a given maximum number of returned values and a highest value.
+ * @param maxValue The maximum highest returned value.
+ * @param nbHash The number of resulting hashed values.
+ * @param hashAlgorithm type of the hashing algorithm (see {@link Hash}).
+ */
+ public HashFunction(int maxValue, int nbHash, int hashAlgorithm) {
+ if (maxValue <= 0) {
+ throw new IllegalArgumentException("maxValue must be > 0");
+ }
+ if (nbHash <= 0) {
+ throw new IllegalArgumentException("nbHash must be > 0");
+ }
+
+ this.maxValue = maxValue;
+ this.nbHash = nbHash;
+ this.hashAlgorithm = Hash.getInstance(hashAlgorithm);
+ }
+
+ /** Clears <i>this</i> hash function. A NOOP */
+ public void clear() {
+ }
+
+ /**
+ * Hashes a specified key into several integers.
+ * @param k The specified key.
+ * @return The array of hashed values.
+ */
+ public int[] hash(Key k){
+ byte[] b = k.getBytes();
+ if (b == null) {
+ throw new NullPointerException("buffer reference is null");
+ }
+ if (b.length == 0) {
+ throw new IllegalArgumentException("key length must be > 0");
+ }
+ return hashAlgorithm.hash(b, maxValue, nbHash);
+ }
+}
\ No newline at end of file
diff --git a/src/org/apache/pig/impl/bloom/HashProvider.java b/src/org/apache/pig/impl/bloom/HashProvider.java
new file mode 100644
index 0000000..48a811b
--- /dev/null
+++ b/src/org/apache/pig/impl/bloom/HashProvider.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/*
+ * Copyright Felix Gessert and Florian Bücklers. All rights reserved. Permission is hereby granted,
+ * free of charge, to any person obtaining a copy of this software and associated documentation files
+ * (the "Software"), to deal in the Software without restriction, including without limitation the
+ * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the
+ * Software, and to permit persons to whom the Software is furnished to do so, subject to the following
+ * conditions:
+ * The above copyright notice and this permission notice shall be included in all copies or substantial
+ * portions of the Software.
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
+ * NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
+ * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
+ * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ */
+package org.apache.pig.impl.bloom;
+
+import java.util.function.BiFunction;
+
+/**
+ * Taken from https://github.com/Baqend/Orestes-Bloomfilter
+ */
+public class HashProvider {
+
+ public static int[] hashMurmur3(byte[] value, int m, int k) {
+ return rejectionSample(HashProvider::murmur3_signed, value, m, k);
+ }
+
+ public static int[] hashKirschMitzenmacher(byte[] value, int m, int k) {
+ int[] result = new int[k];
+ long hash1 = murmur3(0, value);
+ long hash2 = murmur3((int) hash1, value);
+ for (int i = 0; i < k; i++) {
+ result[i] = (int) ((hash1 + i * hash2) % m);
+ }
+ return result;
+ }
+
+ private static long murmur3(int seed, byte[] bytes) {
+ return Integer.toUnsignedLong(murmur3_signed(seed, bytes));
+ }
+
+ private static int murmur3_signed(int seed, byte[] bytes) {
+ int h1 = seed;
+ //Standard in Guava
+ int c1 = 0xcc9e2d51;
+ int c2 = 0x1b873593;
+ int len = bytes.length;
+ int i = 0;
+
+ while (len >= 4) {
+ //process()
+ int k1 = (bytes[i++] & 0xFF);
+ k1 |= (bytes[i++] & 0xFF) << 8;
+ k1 |= (bytes[i++] & 0xFF) << 16;
+ k1 |= (bytes[i++] & 0xFF) << 24;
+
+ k1 *= c1;
+ k1 = Integer.rotateLeft(k1, 15);
+ k1 *= c2;
+
+ h1 ^= k1;
+ h1 = Integer.rotateLeft(h1, 13);
+ h1 = h1 * 5 + 0xe6546b64;
+
+ len -= 4;
+ }
+
+ //processingRemaining()
+ int k1 = 0;
+ switch (len) {
+ case 3:
+ k1 ^= (bytes[i + 2] & 0xFF) << 16;
+ // fall through
+ case 2:
+ k1 ^= (bytes[i + 1] & 0xFF) << 8;
+ // fall through
+ case 1:
+ k1 ^= (bytes[i] & 0xFF);
+
+ k1 *= c1;
+ k1 = Integer.rotateLeft(k1, 15);
+ k1 *= c2;
+ h1 ^= k1;
+ }
+ i += len;
+
+ //makeHash()
+ h1 ^= i;
+
+ h1 ^= h1 >>> 16;
+ h1 *= 0x85ebca6b;
+ h1 ^= h1 >>> 13;
+ h1 *= 0xc2b2ae35;
+ h1 ^= h1 >>> 16;
+
+ return h1;
+ }
+
+ /**
+ * Performs rejection sampling on a random 32bit Java int (sampled from Integer.MIN_VALUE to Integer.MAX_VALUE).
+ *
+ * @param random int
+ * @param m integer output range [1,size]
+ * @return the number down-sampled to interval [0, size]. Or -1 if it has to be rejected.
+ */
+ private static int rejectionSample(int random, int m) {
+ random = Math.abs(random);
+ if (random > (2147483647 - 2147483647 % m)
+ || random == Integer.MIN_VALUE)
+ return -1;
+ else
+ return random % m;
+ }
+
+ private static int[] rejectionSample(BiFunction<Integer, byte[], Integer> hashFunction, byte[] value, int m, int k) {
+ int[] hashes = new int[k];
+ int seed = 0;
+ int pos = 0;
+ while (pos < k) {
+ seed = hashFunction.apply(seed, value);
+ int hash = rejectionSample(seed, m);
+ if (hash != -1) {
+ hashes[pos++] = hash;
+ }
+ }
+ return hashes;
+ }
+
+}
diff --git a/src/org/apache/pig/impl/bloom/JenkinsHash.java b/src/org/apache/pig/impl/bloom/JenkinsHash.java
new file mode 100644
index 0000000..5c03741
--- /dev/null
+++ b/src/org/apache/pig/impl/bloom/JenkinsHash.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.bloom;
+
+public class JenkinsHash extends Hash {
+
+ private org.apache.hadoop.util.hash.Hash hashFunction = org.apache.hadoop.util.hash.JenkinsHash.getInstance();
+
+ @Override
+ public int[] hash(byte[] bytes, int maxValue, int nbHash) {
+ int[] result = new int[nbHash];
+ for (int i = 0, initval = 0; i < nbHash; i++) {
+ initval = hashFunction.hash(bytes, initval);
+ result[i] = Math.abs(initval % maxValue);
+ }
+ return result;
+ }
+
+}
diff --git a/src/org/apache/pig/impl/bloom/KirschMitzenmacherHash.java b/src/org/apache/pig/impl/bloom/KirschMitzenmacherHash.java
new file mode 100644
index 0000000..dcb1f3f
--- /dev/null
+++ b/src/org/apache/pig/impl/bloom/KirschMitzenmacherHash.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.bloom;
+
+public class KirschMitzenmacherHash extends Hash {
+
+ @Override
+ public int[] hash(byte[] bytes, int maxValue, int numHash) {
+ return HashProvider.hashKirschMitzenmacher(bytes, maxValue , numHash);
+ }
+
+}
diff --git a/src/org/apache/pig/impl/bloom/Murmur3Hash.java b/src/org/apache/pig/impl/bloom/Murmur3Hash.java
new file mode 100644
index 0000000..e44ca75
--- /dev/null
+++ b/src/org/apache/pig/impl/bloom/Murmur3Hash.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.bloom;
+
+public class Murmur3Hash extends Hash {
+ @Override
+ public int[] hash(byte[] bytes, int maxValue, int numHash) {
+ return HashProvider.hashMurmur3(bytes, maxValue, numHash);
+ }
+}
diff --git a/src/org/apache/pig/impl/util/JarManager.java b/src/org/apache/pig/impl/util/JarManager.java
index e6c9215..0a63658 100644
--- a/src/org/apache/pig/impl/util/JarManager.java
+++ b/src/org/apache/pig/impl/util/JarManager.java
@@ -48,6 +48,7 @@
import org.apache.hadoop.util.StringUtils;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.org.roaringbitmap.RoaringBitmap;
import org.apache.tools.bzip2r.BZip2Constants;
import org.joda.time.DateTime;
@@ -66,7 +67,8 @@
BZIP2R(BZip2Constants.class),
AUTOMATON(Automaton.class),
ANTLR(CommonTokenStream.class),
- JODATIME(DateTime.class);
+ JODATIME(DateTime.class),
+ SHADED_ROARING_BITMAP(RoaringBitmap.class);
private final Class pkgClass;
diff --git a/test/org/apache/pig/test/TestJobControlCompiler.java b/test/org/apache/pig/test/TestJobControlCompiler.java
index 2c39964..707db47 100644
--- a/test/org/apache/pig/test/TestJobControlCompiler.java
+++ b/test/org/apache/pig/test/TestJobControlCompiler.java
@@ -130,7 +130,7 @@
// verifying the jar gets on distributed cache
Path[] fileClassPaths = DistributedCache.getFileClassPaths(jobConf);
// guava jar is not shipped with Hadoop 2.x
- Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 5, fileClassPaths.length);
+ Assert.assertEquals("size for "+Arrays.toString(fileClassPaths), 6, fileClassPaths.length);
Path distributedCachePath = fileClassPaths[0];
Assert.assertEquals("ends with jar name: "+distributedCachePath, distributedCachePath.getName(), tmpFile.getName());
// hadoop bug requires path to not contain hdfs://hotname in front
@@ -234,11 +234,11 @@
// 4. another.jar and 5. udf1.jar, and not duplicate udf.jar
System.out.println("cache.files= " + Arrays.toString(cacheURIs));
System.out.println("classpath.files= " + Arrays.toString(fileClassPaths));
- // Default jars - 5 (pig, antlr, joda-time, automaton)
+ // Default jars - 5 (pig, antlr, joda-time, automaton, roaring-bitmap-shaded)
// Other jars - 10 (udf.jar#udf.jar, udf1.jar#diffname.jar, udf2.jar, udf1.jar, another.jar
- Assert.assertEquals("size 9 for " + Arrays.toString(cacheURIs), 9,
+ Assert.assertEquals("size 10 for " + Arrays.toString(cacheURIs), 10,
Arrays.asList(StringUtils.join(cacheURIs, ",").split(",")).size());
- Assert.assertEquals("size 9 for " + Arrays.toString(fileClassPaths), 9,
+ Assert.assertEquals("size 10 for " + Arrays.toString(fileClassPaths), 10,
Arrays.asList(StringUtils.join(fileClassPaths, ",").split(",")).size());
// Count occurrences of the resources
Map<String, Integer> occurrences = new HashMap<String, Integer>();
@@ -248,7 +248,7 @@
val = (val == null) ? 1 : ++val;
occurrences.put(cacheURI.toString(), val);
}
- Assert.assertEquals(9, occurrences.size());
+ Assert.assertEquals(10, occurrences.size());
for (String file : occurrences.keySet()) {
// check that only single occurrence even though we added once to dist cache (simulating via Oozie)