blob: 82b599daeb3392368ae1a6984f92a856792efe57 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
import org.apache.pig.builtin.BuildBloomBase;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.NullableTuple;
import org.apache.pig.impl.io.PigNullableWritable;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.library.api.KeyValueReader;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class POBloomFilterRearrangeTez extends POLocalRearrangeTez implements TezInput {
private static final long serialVersionUID = 1L;
private static final Log LOG = LogFactory.getLog(POBloomFilterRearrangeTez.class);
private String inputKey;
private transient KeyValueReader reader;
private transient String cacheKey;
private int numBloomFilters;
private transient BloomFilter[] bloomFilters;
public POBloomFilterRearrangeTez(POLocalRearrangeTez lr, int numBloomFilters) {
super(lr);
this.numBloomFilters = numBloomFilters;
}
public void setInputKey(String inputKey) {
this.inputKey = inputKey;
}
@Override
public String[] getTezInputs() {
return new String[] { inputKey };
}
@Override
public void replaceInput(String oldInputKey, String newInputKey) {
if (oldInputKey.equals(inputKey)) {
inputKey = newInputKey;
}
}
@Override
public void addInputsToSkip(Set<String> inputsToSkip) {
cacheKey = "bloom-" + inputKey;
Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
if (cacheValue != null) {
inputsToSkip.add(inputKey);
}
}
@Override
public void attachInputs(Map<String, LogicalInput> inputs,
Configuration conf) throws ExecException {
Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
if (cacheValue != null) {
bloomFilters = (BloomFilter[]) cacheValue;
return;
}
LogicalInput input = inputs.get(inputKey);
if (input == null) {
throw new ExecException("Input from vertex " + inputKey + " is missing");
}
try {
reader = (KeyValueReader) input.getReader();
LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader);
while (reader.next()) {
if (bloomFilters == null) {
bloomFilters = new BloomFilter[numBloomFilters];
}
Tuple val = (Tuple) reader.getCurrentValue();
int index = (int) val.get(0);
bloomFilters[index] = BuildBloomBase.bloomIn((DataByteArray) val.get(1));
}
ObjectCache.getInstance().cache(cacheKey, bloomFilters);
} catch (Exception e) {
throw new ExecException(e);
}
}
@Override
public Result getNextTuple() throws ExecException {
// If there is no bloom filter, then it means right input was empty
// Skip processing
if (bloomFilters == null) {
return RESULT_EOP;
}
while (true) {
res = super.getRearrangedTuple();
try {
switch (res.returnStatus) {
case POStatus.STATUS_OK:
if (illustrator == null) {
Tuple result = (Tuple) res.result;
Byte index = (Byte) result.get(0);
// Skip the record if key is not in the bloom filter
if (!isKeyInBloomFilter(result.get(1))) {
continue;
}
PigNullableWritable key = HDataType.getWritableComparableTypes(result.get(1), keyType);
NullableTuple val = new NullableTuple((Tuple)result.get(2));
key.setIndex(index);
val.setIndex(index);
writer.write(key, val);
} else {
illustratorMarkup(res.result, res.result, 0);
}
continue;
case POStatus.STATUS_NULL:
continue;
case POStatus.STATUS_EOP:
case POStatus.STATUS_ERR:
default:
return res;
}
} catch (IOException ioe) {
int errCode = 2135;
String msg = "Received error from POBloomFilterRearrage function." + ioe.getMessage();
throw new ExecException(msg, errCode, ioe);
}
}
}
private boolean isKeyInBloomFilter(Object key) throws ExecException {
if (key == null) {
// Null values are dropped in a inner join and in the case of outer join,
// POBloomFilterRearrangeTez is only in the plan on the non outer relation.
// So just skip them
return false;
}
if (bloomFilters.length == 1) {
// Skip computing hashcode
Key k = new Key(DataType.toBytes(key, keyType));
return bloomFilters[0].membershipTest(k);
} else {
int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters;
BloomFilter filter = bloomFilters[partition];
if (filter != null) {
Key k = new Key(DataType.toBytes(key, keyType));
return filter.membershipTest(k);
}
return false;
}
}
@Override
public POBloomFilterRearrangeTez clone() throws CloneNotSupportedException {
return (POBloomFilterRearrangeTez) super.clone();
}
@Override
public String name() {
return getAliasString() + "BloomFilter Rearrange" + "["
+ DataType.findTypeName(resultType) + "]" + "{"
+ DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct
+ ") - " + mKey.toString() + "\t<-\t " + inputKey + "\t->\t " + outputKey;
}
}