blob: 8a746eb0aa809767e4c2091f65381c0911a0d2c4 [file] [log] [blame]
package edu.uci.ics.asterix.optimizer.rules;
import java.io.StringReader;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import edu.uci.ics.asterix.algebra.base.LogicalOperatorDeepCopyVisitor;
import edu.uci.ics.asterix.aql.base.Clause;
import edu.uci.ics.asterix.aql.expression.Identifier;
import edu.uci.ics.asterix.aqlplus.parser.AQLPlusParser;
import edu.uci.ics.asterix.aqlplus.parser.ParseException;
import edu.uci.ics.asterix.common.exceptions.AsterixException;
import edu.uci.ics.asterix.metadata.declared.AqlCompiledMetadataDeclarations;
import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider;
import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions;
import edu.uci.ics.asterix.om.types.IAType;
import edu.uci.ics.asterix.om.types.TypeHelper;
import edu.uci.ics.asterix.optimizer.base.FuzzyUtils;
import edu.uci.ics.asterix.translator.AqlPlusExpressionToPlanTranslator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.Counter;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag;
import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator;
import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities;
import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil;
import edu.uci.ics.hyracks.algebricks.core.api.exceptions.AlgebricksException;
import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule;
public class FuzzyJoinRule implements IAlgebraicRewriteRule {
private static final String AQLPLUS = ""
//
// -- - Stage 3 - --
//
+ "((#RIGHT), "
+ " (join((#LEFT), "
//
// -- -- - Stage 2 - --
//
+ " ("
+ " join( "
+ " ( "
+ " #LEFT_1 "
+ " let $tokensUnrankedLeft := %s($$LEFT_1) "
+ " let $lenLeft := len($tokensUnrankedLeft) "
+ " let $tokensLeft := "
+ " for $token in $tokensUnrankedLeft "
+ " for $tokenRanked at $i in "
//
// -- -- -- - Stage 1 - --
//
// + " #LEFT_2 "
// + " let $id := $$LEFTPK_2 "
// + " for $token in %s($$LEFT_2) "
+ " #RIGHT_2 "
+ " let $id := $$RIGHTPK_2 "
+ " for $token in %s($$RIGHT_2) "
+ " /*+ hash */ "
+ " group by $tokenGroupped := $token with $id "
+ " /*+ inmem 34 198608 */ "
+ " order by count($id), $tokenGroupped "
+ " return $tokenGroupped "
//
// -- -- -- -
//
+ " where $token = /*+ bcast */ $tokenRanked "
+ " order by $i "
+ " return $i "
+ " for $prefixTokenLeft in subset-collection($tokensLeft, 0, prefix-len-%s(len($tokensLeft), %ff)) "
+ " ),( "
+ " #RIGHT_1 "
+ " let $tokensUnrankedRight := %s($$RIGHT_1) "
+ " let $lenRight := len($tokensUnrankedRight) "
+ " let $tokensRight := "
+ " for $token in $tokensUnrankedRight "
+ " for $tokenRanked at $i in "
//
// -- -- -- - Stage 1 - --
//
// + " #LEFT_3 "
// + " let $id := $$LEFTPK_3 "
// + " for $token in %s($$LEFT_3) "
+ " #RIGHT_3 "
+ " let $id := $$RIGHTPK_3 "
+ " for $token in %s($$RIGHT_3) "
+ " /*+ hash */ "
+ " group by $tokenGroupped := $token with $id "
+ " /*+ inmem 34 198608 */ "
+ " order by count($id), $tokenGroupped "
+ " return $tokenGroupped "
//
// -- -- -- -
//
+ " where $token = /*+ bcast */ $tokenRanked "
+ " order by $i "
+ " return $i "
+ " for $prefixTokenRight in subset-collection($tokensRight, 0, prefix-len-%s(len($tokensRight), %ff)) "
+ " ), $prefixTokenLeft = $prefixTokenRight) "
+ " let $sim := similarity-%s-prefix($lenLeft, $tokensLeft, $lenRight, $tokensRight, $prefixTokenLeft, %ff) "
+ " where $sim >= %ff " + " /*+ hash*/ "
+ " group by $idLeft := $$LEFTPK_1, $idRight := $$RIGHTPK_1 with $sim "
//
// -- -- -
//
+ " ), $$LEFTPK = $idLeft)), $$RIGHTPK = $idRight)";
private Collection<LogicalVariable> liveVars = new HashSet<LogicalVariable>();
@Override
public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue();
// current opperator is join
if (op.getOperatorTag() != LogicalOperatorTag.INNERJOIN
&& op.getOperatorTag() != LogicalOperatorTag.LEFTOUTERJOIN) {
return false;
}
// find fuzzy join condition
AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) op;
Mutable<ILogicalExpression> expRef = joinOp.getCondition();
Mutable<ILogicalExpression> fuzzyExpRef = getSimilarityExpression(expRef);
if (fuzzyExpRef == null) {
return false;
}
AbstractFunctionCallExpression funcExp = (AbstractFunctionCallExpression) fuzzyExpRef.getValue();
List<Mutable<ILogicalOperator>> inputOps = joinOp.getInputs();
ILogicalOperator leftInputOp = inputOps.get(0).getValue();
ILogicalOperator rightInputOp = inputOps.get(1).getValue();
List<Mutable<ILogicalExpression>> inputExps = funcExp.getArguments();
ILogicalExpression inputExp0 = inputExps.get(0).getValue();
ILogicalExpression inputExp1 = inputExps.get(1).getValue();
// left and right expressions are variables
if (inputExp0.getExpressionTag() != LogicalExpressionTag.VARIABLE
|| inputExp1.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
return false;
}
LogicalVariable inputVar0 = ((VariableReferenceExpression) inputExp0).getVariableReference();
LogicalVariable inputVar1 = ((VariableReferenceExpression) inputExp1).getVariableReference();
LogicalVariable leftInputVar;
LogicalVariable rightInputVar;
liveVars.clear();
VariableUtilities.getLiveVariables(leftInputOp, liveVars);
if (liveVars.contains(inputVar0)) {
leftInputVar = inputVar0;
rightInputVar = inputVar1;
} else {
leftInputVar = inputVar1;
rightInputVar = inputVar0;
}
List<LogicalVariable> leftInputPKs = context.findPrimaryKey(leftInputVar);
List<LogicalVariable> rightInputPKs = context.findPrimaryKey(rightInputVar);
// primary key has only one variable
if (leftInputPKs.size() != 1 || rightInputPKs.size() != 1) {
return false;
}
IAType leftType = (IAType) context.getOutputTypeEnvironment(leftInputOp).getVarType(leftInputVar);
IAType rightType = (IAType) context.getOutputTypeEnvironment(rightInputOp).getVarType(rightInputVar);
// left-hand side and right-hand side of "~=" has the same type
IAType left2 = TypeHelper.getNonOptionalType(leftType);
IAType right2 = TypeHelper.getNonOptionalType(rightType);
if (!left2.deepEqual(right2)) {
return false;
}
//
// -- - FIRE - --
//
AqlCompiledMetadataDeclarations metadata = ((AqlMetadataProvider) context.getMetadataProvider())
.getMetadataDeclarations();
FunctionIdentifier funcId = FuzzyUtils.getTokenizer(leftType.getTypeTag());
String tokenizer;
if (funcId == null) {
tokenizer = "";
} else {
tokenizer = funcId.getName();
}
float simThreshold = FuzzyUtils.getSimThreshold(metadata);
String simFunction = FuzzyUtils.getSimFunction(metadata);
// finalize AQL+ query
String prepareJoin;
switch (joinOp.getJoinKind()) {
case INNER: {
prepareJoin = "join" + AQLPLUS;
break;
}
case LEFT_OUTER: {
// TODO To make it work for Left Outer Joins, we should permute
// the #LEFT and #RIGHT at the top of the AQL+ query. But, when
// doing this, the
// fuzzyjoin/user-vis-int-vis-user-lot-aqlplus_1.aql (the one
// doing 3-way fuzzy joins) gives a different result. But even
// if we don't change the FuzzyJoinRule, permuting the for
// clauses in fuzzyjoin/user-vis-int-vis-user-lot-aqlplus_1.aql
// leads to different results, which suggests there is some
// other sort of bug.
return false;
// prepareJoin = "loj" + AQLPLUS;
// break;
}
default: {
throw new IllegalStateException();
}
}
String aqlPlus = String.format(Locale.US, prepareJoin, tokenizer, tokenizer, simFunction, simThreshold,
tokenizer, tokenizer, simFunction, simThreshold, simFunction, simThreshold, simThreshold);
LogicalVariable leftPKVar = leftInputPKs.get(0);
LogicalVariable rightPKVar = rightInputPKs.get(0);
Counter counter = new Counter(context.getVarCounter());
AQLPlusParser parser = new AQLPlusParser(new StringReader(aqlPlus));
parser.initScope();
parser.setVarCounter(counter);
List<Clause> clauses;
try {
clauses = parser.Clauses();
} catch (ParseException e) {
throw new AlgebricksException(e);
}
// The translator will compile metadata internally. Run this compilation
// under the same transaction id as the "outer" compilation.
AqlMetadataProvider mp = (AqlMetadataProvider) context.getMetadataProvider();
AqlPlusExpressionToPlanTranslator translator = new AqlPlusExpressionToPlanTranslator(mp.getTxnId(),
metadata.getMetadataTransactionContext(), counter, null);
LogicalOperatorDeepCopyVisitor deepCopyVisitor = new LogicalOperatorDeepCopyVisitor(counter);
translator.addOperatorToMetaScope(new Identifier("#LEFT"), leftInputOp);
translator.addVariableToMetaScope(new Identifier("$$LEFT"), leftInputVar);
translator.addVariableToMetaScope(new Identifier("$$LEFTPK"), leftPKVar);
translator.addOperatorToMetaScope(new Identifier("#RIGHT"), rightInputOp);
translator.addVariableToMetaScope(new Identifier("$$RIGHT"), rightInputVar);
translator.addVariableToMetaScope(new Identifier("$$RIGHTPK"), rightPKVar);
translator.addOperatorToMetaScope(new Identifier("#LEFT_1"), deepCopyVisitor.deepCopy(leftInputOp, null));
translator.addVariableToMetaScope(new Identifier("$$LEFT_1"), deepCopyVisitor.varCopy(leftInputVar));
translator.addVariableToMetaScope(new Identifier("$$LEFTPK_1"), deepCopyVisitor.varCopy(leftPKVar));
deepCopyVisitor.updatePrimaryKeys(context);
deepCopyVisitor.reset();
// translator.addOperatorToMetaScope(new Identifier("#LEFT_2"),
// deepCopyVisitor.deepCopy(leftInputOp, null));
// translator.addVariableToMetaScope(new Identifier("$$LEFT_2"),
// deepCopyVisitor.varCopy(leftInputVar));
// translator.addVariableToMetaScope(new Identifier("$$LEFTPK_2"),
// deepCopyVisitor.varCopy(leftPKVar));
// deepCopyVisitor.updatePrimaryKeys(context);
// deepCopyVisitor.reset();
//
// translator.addOperatorToMetaScope(new Identifier("#LEFT_3"),
// deepCopyVisitor.deepCopy(leftInputOp, null));
// translator.addVariableToMetaScope(new Identifier("$$LEFT_3"),
// deepCopyVisitor.varCopy(leftInputVar));
// translator.addVariableToMetaScope(new Identifier("$$LEFTPK_3"),
// deepCopyVisitor.varCopy(leftPKVar));
// deepCopyVisitor.updatePrimaryKeys(context);
// deepCopyVisitor.reset();
translator.addOperatorToMetaScope(new Identifier("#RIGHT_1"), deepCopyVisitor.deepCopy(rightInputOp, null));
translator.addVariableToMetaScope(new Identifier("$$RIGHT_1"), deepCopyVisitor.varCopy(rightInputVar));
translator.addVariableToMetaScope(new Identifier("$$RIGHTPK_1"), deepCopyVisitor.varCopy(rightPKVar));
deepCopyVisitor.updatePrimaryKeys(context);
deepCopyVisitor.reset();
// TODO pick side to run Stage 1, currently always picks RIGHT side
translator.addOperatorToMetaScope(new Identifier("#RIGHT_2"), deepCopyVisitor.deepCopy(rightInputOp, null));
translator.addVariableToMetaScope(new Identifier("$$RIGHT_2"), deepCopyVisitor.varCopy(rightInputVar));
translator.addVariableToMetaScope(new Identifier("$$RIGHTPK_2"), deepCopyVisitor.varCopy(rightPKVar));
deepCopyVisitor.updatePrimaryKeys(context);
deepCopyVisitor.reset();
translator.addOperatorToMetaScope(new Identifier("#RIGHT_3"), deepCopyVisitor.deepCopy(rightInputOp, null));
translator.addVariableToMetaScope(new Identifier("$$RIGHT_3"), deepCopyVisitor.varCopy(rightInputVar));
translator.addVariableToMetaScope(new Identifier("$$RIGHTPK_3"), deepCopyVisitor.varCopy(rightPKVar));
deepCopyVisitor.updatePrimaryKeys(context);
deepCopyVisitor.reset();
ILogicalPlan plan;
try {
plan = translator.translate(clauses);
} catch (AsterixException e) {
throw new AlgebricksException(e);
}
context.setVarCounter(counter.get());
ILogicalOperator outputOp = plan.getRoots().get(0).getValue();
SelectOperator extraSelect = null;
if (fuzzyExpRef != expRef) {
// more than one join condition
fuzzyExpRef.setValue(ConstantExpression.TRUE);
switch (joinOp.getJoinKind()) {
case INNER: {
extraSelect = new SelectOperator(expRef);
extraSelect.getInputs().add(new MutableObject<ILogicalOperator>(outputOp));
outputOp = extraSelect;
break;
}
case LEFT_OUTER: {
if (((AbstractLogicalOperator) outputOp).getOperatorTag() != LogicalOperatorTag.LEFTOUTERJOIN) {
throw new IllegalStateException();
}
LeftOuterJoinOperator topJoin = (LeftOuterJoinOperator) outputOp;
topJoin.getCondition().setValue(expRef.getValue());
break;
}
default: {
throw new IllegalStateException();
}
}
}
opRef.setValue(outputOp);
OperatorPropertiesUtil.typeOpRec(opRef, context);
return true;
}
/*
* look for FUZZY_EQ function call
*/
private Mutable<ILogicalExpression> getSimilarityExpression(Mutable<ILogicalExpression> expRef) {
ILogicalExpression exp = expRef.getValue();
if (exp.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
AbstractFunctionCallExpression funcExp = (AbstractFunctionCallExpression) exp;
if (funcExp.getFunctionIdentifier().equals(AsterixBuiltinFunctions.FUZZY_EQ)) {
return expRef;
} else if (funcExp.getFunctionIdentifier().equals(AlgebricksBuiltinFunctions.AND)
|| funcExp.getFunctionIdentifier().equals(AlgebricksBuiltinFunctions.OR)) {
for (int i = 0; i < 2; i++) {
Mutable<ILogicalExpression> expRefRet = getSimilarityExpression(funcExp.getArguments().get(i));
if (expRefRet != null) {
return expRefRet;
}
}
}
}
return null;
}
@Override
public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException {
return false;
}
}