blob: eb8a612cf3382483b319c54053038ccad5ef50bf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableIntWritable;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.library.api.KeyValueWriter;
/**
* This operator writes out the key value for the hash join reduce operation similar to POLocalRearrangeTez.
* In addition, it also writes out the bloom filter constructed from the join keys
* in the case of bloomjoin map strategy or join keys themselves in case of reduce strategy.
*
* Using multiple bloom filters partitioned by the hash of the key allows for parallelism.
* It also allows us to have lower false positives with smaller vector sizes.
*
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class POBuildBloomRearrangeTez extends POLocalRearrangeTez {
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(POBuildBloomRearrangeTez.class);
public static final String DEFAULT_BLOOM_STRATEGY = "map";
public static final int DEFAULT_NUM_BLOOM_FILTERS_REDUCE = 11;
public static final int DEFAULT_NUM_BLOOM_HASH_FUNCTIONS = 3;
public static final String DEFAULT_BLOOM_HASH_TYPE = "murmur";
public static final int DEFAULT_BLOOM_VECTOR_SIZE_BYTES = 1024 * 1024;
private String bloomOutputKey;
private boolean skipNullKeys = false;
private boolean createBloomInMap;
private int numBloomFilters;
private int vectorSizeBytes;
private int numHash;
private int hashType;
private transient BloomFilter[] bloomFilters;
private transient KeyValueWriter bloomWriter;
private transient PigNullableWritable nullKey;
private transient Tuple bloomValue;
private transient NullableTuple bloomNullableTuple;
public POBuildBloomRearrangeTez(POLocalRearrangeTez lr,
boolean createBloomInMap, int numBloomFilters, int vectorSizeBytes,
int numHash, int hashType) {
super(lr);
this.createBloomInMap = createBloomInMap;
this.numBloomFilters = numBloomFilters;
this.vectorSizeBytes = vectorSizeBytes;
this.numHash = numHash;
this.hashType = hashType;
}
public static int getNumBloomFilters(Configuration conf) {
if ("map".equals(conf.get(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, DEFAULT_BLOOM_STRATEGY))) {
return conf.getInt(PigConfiguration.PIG_BLOOMJOIN_NUM_FILTERS, 1);
} else {
return conf.getInt(PigConfiguration.PIG_BLOOMJOIN_NUM_FILTERS, DEFAULT_NUM_BLOOM_FILTERS_REDUCE);
}
}
public void setSkipNullKeys(boolean skipNullKeys) {
this.skipNullKeys = skipNullKeys;
}
public void setBloomOutputKey(String bloomOutputKey) {
this.bloomOutputKey = bloomOutputKey;
}
@Override
public boolean containsOutputKey(String key) {
if(super.containsOutputKey(key)) {
return true;
}
return bloomOutputKey.equals(key);
}
@Override
public String[] getTezOutputs() {
return new String[] { outputKey, bloomOutputKey };
}
@Override
public void replaceOutput(String oldOutputKey, String newOutputKey) {
if (oldOutputKey.equals(outputKey)) {
outputKey = newOutputKey;
} else if (oldOutputKey.equals(bloomOutputKey)) {
bloomOutputKey = newOutputKey;
}
}
@Override
public void attachOutputs(Map<String, LogicalOutput> outputs,
Configuration conf) throws ExecException {
super.attachOutputs(outputs, conf);
LogicalOutput output = outputs.get(bloomOutputKey);
if (output == null) {
throw new ExecException("Output to vertex " + bloomOutputKey + " is missing");
}
try {
bloomWriter = (KeyValueWriter) output.getWriter();
LOG.info("Attached output to vertex " + bloomOutputKey + " : output=" + output + ", writer=" + bloomWriter);
} catch (Exception e) {
throw new ExecException(e);
}
bloomFilters = new BloomFilter[numBloomFilters];
bloomValue = mTupleFactory.newTuple(1);
bloomNullableTuple = new NullableTuple(bloomValue);
}
@Override
public Result getNextTuple() throws ExecException {
PigNullableWritable key;
while (true) {
res = super.getRearrangedTuple();
try {
switch (res.returnStatus) {
case POStatus.STATUS_OK:
if (illustrator == null) {
Tuple result = (Tuple) res.result;
Byte index = (Byte) result.get(0);
Object keyObj = result.get(1);
if (keyObj != null) {
key = HDataType.getWritableComparableTypes(keyObj, keyType);
// null keys cannot be part of bloom filter
// Since they are also dropped during join we can skip them
if (createBloomInMap) {
addKeyToBloomFilter(keyObj);
} else {
writeJoinKeyForBloom(keyObj);
}
} else if (skipNullKeys) {
// Inner join. So don't bother writing null key
continue;
} else {
if (nullKey == null) {
nullKey = HDataType.getWritableComparableTypes(keyObj, keyType);
}
key = nullKey;
}
NullableTuple val = new NullableTuple((Tuple)result.get(2));
key.setIndex(index);
val.setIndex(index);
writer.write(key, val);
} else {
illustratorMarkup(res.result, res.result, 0);
}
continue;
case POStatus.STATUS_NULL:
continue;
case POStatus.STATUS_EOP:
if (this.parentPlan.endOfAllInput && createBloomInMap) {
// In case of Split will get EOP after every record.
// So check for endOfAllInput
writeBloomFilters();
}
case POStatus.STATUS_ERR:
default:
return res;
}
} catch (IOException ioe) {
int errCode = 2135;
String msg = "Received error from POBuildBloomRearrage function." + ioe.getMessage();
throw new ExecException(msg, errCode, ioe);
}
}
}
private void addKeyToBloomFilter(Object key) throws ExecException {
Key k = new Key(DataType.toBytes(key, keyType));
if (bloomFilters.length == 1) {
if (bloomFilters[0] == null) {
bloomFilters[0] = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
}
bloomFilters[0].add(k);
} else {
int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
BloomFilter filter = bloomFilters[partition];
if (filter == null) {
filter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType);
bloomFilters[partition] = filter;
}
filter.add(k);
}
}
private void writeJoinKeyForBloom(Object key) throws IOException {
int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
bloomValue.set(0, key);
bloomWriter.write(new NullableIntWritable(partition), bloomNullableTuple);
}
private void writeBloomFilters() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(vectorSizeBytes + 64);
for (int i = 0; i < bloomFilters.length; i++) {
if (bloomFilters[i] != null) {
DataOutputStream dos = new DataOutputStream(baos);
bloomFilters[i].write(dos);
dos.flush();
bloomValue.set(0, new DataByteArray(baos.toByteArray()));
bloomWriter.write(new NullableIntWritable(i), bloomNullableTuple);
baos.reset();
}
}
}
@Override
public POBuildBloomRearrangeTez clone() throws CloneNotSupportedException {
return (POBuildBloomRearrangeTez) super.clone();
}
@Override
public String name() {
return getAliasString() + "BuildBloom Rearrange" + "["
+ DataType.findTypeName(resultType) + "]" + "{"
+ DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct
+ ") - " + mKey.toString() + "\t->\t[ " + outputKey + ", " + bloomOutputKey +"]";
}
}