blob: bd0124bf223dda1cf8dc1d695ef66923e95f5b36 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.builtin;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.hash.Hash;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
/**
* A Base class for BuildBloom and its Algebraic implementations.
*/
public abstract class BuildBloomBase<T> extends EvalFunc<T> {
protected int vSize;
protected int numHash;
protected int hType;
protected BloomFilter filter;
protected BuildBloomBase() {
}
/**
* @param hashType type of the hashing function (see
* {@link org.apache.hadoop.util.hash.Hash}).
* @param mode Will be ignored, though by convention it should be
* "fixed" or "fixedsize"
* @param vectorSize The vector size of <i>this</i> filter.
* @param nbHash The number of hash functions to consider.
*/
public BuildBloomBase(String hashType,
String mode,
String vectorSize,
String nbHash) {
vSize = Integer.valueOf(vectorSize);
numHash = Integer.valueOf(nbHash);
hType = convertHashType(hashType);
}
/**
* @param hashType type of the hashing function (see
* {@link org.apache.hadoop.util.hash.Hash}).
* @param numElements The number of distinct elements expected to be
* placed in this filter.
* @param desiredFalsePositive the acceptable rate of false positives.
* This should be a floating point value between 0 and 1.0, where 1.0
* would be 100% (ie, a totally useless filter).
*/
public BuildBloomBase(String hashType,
String numElements,
String desiredFalsePositive) {
setSize(numElements, desiredFalsePositive);
hType = convertHashType(hashType);
}
protected DataByteArray bloomOr(Tuple input) throws IOException {
filter = new BloomFilter(vSize, numHash, hType);
try {
DataBag values = (DataBag)input.get(0);
for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
Tuple t = it.next();
filter.or(bloomIn((DataByteArray)t.get(0)));
}
} catch (ExecException ee) {
throw new IOException(ee);
}
return bloomOut();
}
protected DataByteArray bloomOut() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(vSize / 8);
DataOutputStream dos = new DataOutputStream(baos);
filter.write(dos);
return new DataByteArray(baos.toByteArray());
}
public static BloomFilter bloomIn(DataByteArray b) throws IOException {
DataInputStream dis = new DataInputStream(new
ByteArrayInputStream(b.get()));
BloomFilter f = new BloomFilter();
f.readFields(dis);
return f;
}
private int convertHashType(String hashType) {
if (hashType.toLowerCase().contains("jenkins")) {
return Hash.JENKINS_HASH;
} else if (hashType.toLowerCase().contains("murmur")) {
return Hash.MURMUR_HASH;
} else {
throw new RuntimeException("Unknown hash type " + hashType +
". Valid values are jenkins and murmur.");
}
}
private void setSize(String numElements, String desiredFalsePositive) {
int num = Integer.valueOf(numElements);
float fp = Float.valueOf(desiredFalsePositive);
if (num < 1 || fp < 0.0 || fp >= 1.0) {
throw new RuntimeException("Number of elements must be greater "
+ "than zero and desiredFalsePositive must be between 0 "
+ " and 1.");
}
vSize = (int)(-1 * (num * Math.log(fp)) / Math.pow(Math.log(2), 2));
log.info("BuildBloom setting vector size to " + vSize);
numHash = (int)(0.7 * vSize / num);
log.info("BuildBloom setting number of hashes to " + numHash);
}
}