blob: ab73df25a09723adeb7a08daeba15c1d8c3a9d3a [file] [log] [blame]
package edu.uci.ics.asterix.runtime.evaluators.common;
import java.io.IOException;
import edu.uci.ics.asterix.builders.OrderedListBuilder;
import edu.uci.ics.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
import edu.uci.ics.asterix.formats.nontagged.AqlSerializerDeserializerProvider;
import edu.uci.ics.asterix.om.base.ABoolean;
import edu.uci.ics.asterix.om.types.AOrderedListType;
import edu.uci.ics.asterix.om.types.BuiltinType;
import edu.uci.ics.asterix.runtime.evaluators.functions.BinaryHashMap.BinaryEntry;
import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluator;
import edu.uci.ics.hyracks.algebricks.runtime.base.ICopyEvaluatorFactory;
import edu.uci.ics.hyracks.api.dataflow.value.ISerializerDeserializer;
import edu.uci.ics.hyracks.data.std.api.IDataOutputProvider;
import edu.uci.ics.hyracks.data.std.primitive.IntegerPointable;
import edu.uci.ics.hyracks.data.std.util.ArrayBackedValueStorage;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public class SimilarityJaccardCheckEvaluator extends SimilarityJaccardEvaluator {
protected final ICopyEvaluator jaccThreshEval;
protected float jaccThresh = -1f;
protected OrderedListBuilder listBuilder;
protected ArrayBackedValueStorage inputVal;
@SuppressWarnings("unchecked")
protected final ISerializerDeserializer<ABoolean> booleanSerde = AqlSerializerDeserializerProvider.INSTANCE
.getSerializerDeserializer(BuiltinType.ABOOLEAN);
protected final AOrderedListType listType = new AOrderedListType(BuiltinType.ANY, "list");
public SimilarityJaccardCheckEvaluator(ICopyEvaluatorFactory[] args, IDataOutputProvider output)
throws AlgebricksException {
super(args, output);
jaccThreshEval = args[2].createEvaluator(argOut);
listBuilder = new OrderedListBuilder();
inputVal = new ArrayBackedValueStorage();
}
@Override
protected void runArgEvals(IFrameTupleReference tuple) throws AlgebricksException {
super.runArgEvals(tuple);
int jaccThreshStart = argOut.getLength();
jaccThreshEval.evaluate(tuple);
jaccThresh = (float) AFloatSerializerDeserializer.getFloat(argOut.getByteArray(), jaccThreshStart
+ TYPE_INDICATOR_SIZE);
}
@Override
protected int probeHashMap(AbstractAsterixListIterator probeIter, int buildListSize, int probeListSize) {
// Apply length filter.
int lengthLowerBound = (int) Math.ceil(jaccThresh * probeListSize);
if ((lengthLowerBound > buildListSize) || (buildListSize > (int) Math.floor(1.0f / jaccThresh * probeListSize))) {
return -1;
}
// Probe phase: Probe items from second list, and compute intersection size.
int intersectionSize = 0;
int probeListCount = 0;
int minUnionSize = probeListSize;
while (probeIter.hasNext()) {
probeListCount++;
byte[] buf = probeIter.getData();
int off = probeIter.getPos();
int len = getItemLen(buf, off);
keyEntry.set(buf, off, len);
BinaryEntry entry = hashMap.get(keyEntry);
if (entry != null) {
// Increment second value.
int firstValInt = IntegerPointable.getInteger(entry.buf, entry.off);
// Irrelevant for the intersection size.
if (firstValInt == 0) {
continue;
}
int secondValInt = IntegerPointable.getInteger(entry.buf, entry.off + 4);
// Subtract old min value.
intersectionSize -= (firstValInt < secondValInt) ? firstValInt : secondValInt;
secondValInt++;
// Add new min value.
intersectionSize += (firstValInt < secondValInt) ? firstValInt : secondValInt;
IntegerPointable.setInteger(entry.buf, entry.off + 4, secondValInt);
} else {
// Could not find element in other set. Increase min union size by 1.
minUnionSize++;
// Check whether jaccThresh can still be satisfied if there was a mismatch.
int maxIntersectionSize = intersectionSize + (probeListSize - probeListCount);
int lowerBound = (int) Math.floor(jaccThresh * minUnionSize);
if (maxIntersectionSize < lowerBound) {
// Cannot satisfy jaccThresh.
return -1;
}
}
probeIter.next();
}
return intersectionSize;
}
@Override
protected void writeResult(float jacc) throws IOException {
listBuilder.reset(listType);
boolean matches = (jacc < jaccThresh) ? false : true;
inputVal.reset();
booleanSerde.serialize(matches ? ABoolean.TRUE : ABoolean.FALSE, inputVal.getDataOutput());
listBuilder.addItem(inputVal);
inputVal.reset();
aFloat.setValue((matches) ? jacc : 0.0f);
floatSerde.serialize(aFloat, inputVal.getDataOutput());
listBuilder.addItem(inputVal);
listBuilder.write(out, true);
}
}