| package edu.uci.ics.asterix.optimizer.rules; |
| |
| import java.io.StringReader; |
| import java.util.Collection; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Locale; |
| |
| import org.apache.commons.lang3.mutable.Mutable; |
| import org.apache.commons.lang3.mutable.MutableObject; |
| |
| import edu.uci.ics.asterix.algebra.base.LogicalOperatorDeepCopyVisitor; |
| import edu.uci.ics.asterix.aql.base.Clause; |
| import edu.uci.ics.asterix.aql.expression.Identifier; |
| import edu.uci.ics.asterix.aqlplus.parser.AQLPlusParser; |
| import edu.uci.ics.asterix.aqlplus.parser.ParseException; |
| import edu.uci.ics.asterix.common.exceptions.AsterixException; |
| import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider; |
| import edu.uci.ics.asterix.om.functions.AsterixBuiltinFunctions; |
| import edu.uci.ics.asterix.om.types.IAType; |
| import edu.uci.ics.asterix.om.types.TypeHelper; |
| import edu.uci.ics.asterix.optimizer.base.FuzzyUtils; |
| import edu.uci.ics.asterix.translator.AqlPlusExpressionToPlanTranslator; |
| import edu.uci.ics.hyracks.algebricks.common.exceptions.AlgebricksException; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.base.Counter; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalExpression; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalOperator; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.base.ILogicalPlan; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.base.IOptimizationContext; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalExpressionTag; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.base.LogicalVariable; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.ConstantExpression; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.IndexedNLJoinExpressionAnnotation; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.LeftOuterJoinOperator; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.SelectOperator; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.operators.logical.visitors.VariableUtilities; |
| import edu.uci.ics.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil; |
| import edu.uci.ics.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; |
| |
| public class FuzzyJoinRule implements IAlgebraicRewriteRule { |
| |
| private static HashSet<FunctionIdentifier> simFuncs = new HashSet<FunctionIdentifier>(); |
| static { |
| simFuncs.add(AsterixBuiltinFunctions.SIMILARITY_JACCARD_CHECK); |
| } |
| |
| private static final String AQLPLUS = "" |
| // |
| // -- - Stage 3 - -- |
| // |
| + "((#RIGHT), " |
| + " (join((#LEFT), " |
| // |
| // -- -- - Stage 2 - -- |
| // |
| + " (" |
| + " join( " |
| + " ( " |
| + " #LEFT_1 " |
| + " let $tokensUnrankedLeft := %s($$LEFT_1) " |
| + " let $lenLeft := len($tokensUnrankedLeft) " |
| + " let $tokensLeft := " |
| + " for $token in $tokensUnrankedLeft " |
| + " for $tokenRanked at $i in " |
| // |
| // -- -- -- - Stage 1 - -- |
| // |
| // + " #LEFT_2 " |
| // + " let $id := $$LEFTPK_2 " |
| // + " for $token in %s($$LEFT_2) " |
| + " #RIGHT_2 " |
| + " let $id := $$RIGHTPK_2 " |
| + " for $token in %s($$RIGHT_2) " |
| + " /*+ hash */ " |
| + " group by $tokenGroupped := $token with $id " |
| + " /*+ inmem 34 198608 */ " |
| + " order by count($id), $tokenGroupped " |
| + " return $tokenGroupped " |
| // |
| // -- -- -- - |
| // |
| + " where $token = /*+ bcast */ $tokenRanked " |
| + " order by $i " |
| + " return $i " |
| + " for $prefixTokenLeft in subset-collection($tokensLeft, 0, prefix-len-%s(len($tokensLeft), %ff)) " |
| + " ),( " |
| + " #RIGHT_1 " |
| + " let $tokensUnrankedRight := %s($$RIGHT_1) " |
| + " let $lenRight := len($tokensUnrankedRight) " |
| + " let $tokensRight := " |
| + " for $token in $tokensUnrankedRight " |
| + " for $tokenRanked at $i in " |
| // |
| // -- -- -- - Stage 1 - -- |
| // |
| // + " #LEFT_3 " |
| // + " let $id := $$LEFTPK_3 " |
| // + " for $token in %s($$LEFT_3) " |
| + " #RIGHT_3 " |
| + " let $id := $$RIGHTPK_3 " |
| + " for $token in %s($$RIGHT_3) " |
| + " /*+ hash */ " |
| + " group by $tokenGroupped := $token with $id " |
| + " /*+ inmem 34 198608 */ " |
| + " order by count($id), $tokenGroupped " |
| + " return $tokenGroupped " |
| // |
| // -- -- -- - |
| // |
| + " where $token = /*+ bcast */ $tokenRanked " |
| + " order by $i " |
| + " return $i " |
| + " for $prefixTokenRight in subset-collection($tokensRight, 0, prefix-len-%s(len($tokensRight), %ff)) " |
| + " ), $prefixTokenLeft = $prefixTokenRight) " |
| + " let $sim := similarity-%s-prefix($lenLeft, $tokensLeft, $lenRight, $tokensRight, $prefixTokenLeft, %ff) " |
| + " where $sim >= %ff " + " /*+ hash*/ " |
| + " group by $idLeft := $$LEFTPK_1, $idRight := $$RIGHTPK_1 with $sim " |
| // |
| // -- -- - |
| // |
| + " ), $$LEFTPK = $idLeft)), $$RIGHTPK = $idRight)"; |
| |
| private Collection<LogicalVariable> liveVars = new HashSet<LogicalVariable>(); |
| |
| @Override |
| public boolean rewritePost(Mutable<ILogicalOperator> opRef, IOptimizationContext context) |
| throws AlgebricksException { |
| AbstractLogicalOperator op = (AbstractLogicalOperator) opRef.getValue(); |
| // current opperator is join |
| if (op.getOperatorTag() != LogicalOperatorTag.INNERJOIN |
| && op.getOperatorTag() != LogicalOperatorTag.LEFTOUTERJOIN) { |
| return false; |
| } |
| |
| // Find GET_ITEM function. |
| AbstractBinaryJoinOperator joinOp = (AbstractBinaryJoinOperator) op; |
| Mutable<ILogicalExpression> expRef = joinOp.getCondition(); |
| Mutable<ILogicalExpression> getItemExprRef = getSimilarityExpression(expRef); |
| if (getItemExprRef == null) { |
| return false; |
| } |
| // Check if the GET_ITEM function is on one of the supported similarity-check functions. |
| AbstractFunctionCallExpression getItemFuncExpr = (AbstractFunctionCallExpression) getItemExprRef.getValue(); |
| Mutable<ILogicalExpression> argRef = getItemFuncExpr.getArguments().get(0); |
| AbstractFunctionCallExpression simFuncExpr = (AbstractFunctionCallExpression) argRef.getValue(); |
| if (!simFuncs.contains(simFuncExpr.getFunctionIdentifier())) { |
| return false; |
| } |
| // Skip this rule based on annotations. |
| if (simFuncExpr.getAnnotations().containsKey(IndexedNLJoinExpressionAnnotation.INSTANCE)) { |
| return false; |
| } |
| |
| List<Mutable<ILogicalOperator>> inputOps = joinOp.getInputs(); |
| ILogicalOperator leftInputOp = inputOps.get(0).getValue(); |
| ILogicalOperator rightInputOp = inputOps.get(1).getValue(); |
| |
| List<Mutable<ILogicalExpression>> inputExps = simFuncExpr.getArguments(); |
| |
| ILogicalExpression inputExp0 = inputExps.get(0).getValue(); |
| ILogicalExpression inputExp1 = inputExps.get(1).getValue(); |
| |
| // left and right expressions are variables |
| if (inputExp0.getExpressionTag() != LogicalExpressionTag.VARIABLE |
| || inputExp1.getExpressionTag() != LogicalExpressionTag.VARIABLE) { |
| return false; |
| } |
| |
| LogicalVariable inputVar0 = ((VariableReferenceExpression) inputExp0).getVariableReference(); |
| LogicalVariable inputVar1 = ((VariableReferenceExpression) inputExp1).getVariableReference(); |
| |
| LogicalVariable leftInputVar; |
| LogicalVariable rightInputVar; |
| |
| liveVars.clear(); |
| VariableUtilities.getLiveVariables(leftInputOp, liveVars); |
| if (liveVars.contains(inputVar0)) { |
| leftInputVar = inputVar0; |
| rightInputVar = inputVar1; |
| } else { |
| leftInputVar = inputVar1; |
| rightInputVar = inputVar0; |
| } |
| |
| List<LogicalVariable> leftInputPKs = context.findPrimaryKey(leftInputVar); |
| List<LogicalVariable> rightInputPKs = context.findPrimaryKey(rightInputVar); |
| // Bail if primary keys could not be inferred. |
| if (leftInputPKs == null || rightInputPKs == null) { |
| return false; |
| } |
| // primary key has only one variable |
| if (leftInputPKs.size() != 1 || rightInputPKs.size() != 1) { |
| return false; |
| } |
| IAType leftType = (IAType) context.getOutputTypeEnvironment(leftInputOp).getVarType(leftInputVar); |
| IAType rightType = (IAType) context.getOutputTypeEnvironment(rightInputOp).getVarType(rightInputVar); |
| // left-hand side and right-hand side of "~=" has the same type |
| IAType left2 = TypeHelper.getNonOptionalType(leftType); |
| IAType right2 = TypeHelper.getNonOptionalType(rightType); |
| if (!left2.deepEqual(right2)) { |
| return false; |
| } |
| // |
| // -- - FIRE - -- |
| // |
| AqlMetadataProvider metadataProvider = ((AqlMetadataProvider) context.getMetadataProvider()); |
| FunctionIdentifier funcId = FuzzyUtils.getTokenizer(leftType.getTypeTag()); |
| String tokenizer; |
| if (funcId == null) { |
| tokenizer = ""; |
| } else { |
| tokenizer = funcId.getName(); |
| } |
| |
| float simThreshold = FuzzyUtils.getSimThreshold(metadataProvider); |
| String simFunction = FuzzyUtils.getSimFunction(metadataProvider); |
| |
| // finalize AQL+ query |
| String prepareJoin; |
| switch (joinOp.getJoinKind()) { |
| case INNER: { |
| prepareJoin = "join" + AQLPLUS; |
| break; |
| } |
| case LEFT_OUTER: { |
| // TODO To make it work for Left Outer Joins, we should permute |
| // the #LEFT and #RIGHT at the top of the AQL+ query. But, when |
| // doing this, the |
| // fuzzyjoin/user-vis-int-vis-user-lot-aqlplus_1.aql (the one |
| // doing 3-way fuzzy joins) gives a different result. But even |
| // if we don't change the FuzzyJoinRule, permuting the for |
| // clauses in fuzzyjoin/user-vis-int-vis-user-lot-aqlplus_1.aql |
| // leads to different results, which suggests there is some |
| // other sort of bug. |
| return false; |
| // prepareJoin = "loj" + AQLPLUS; |
| // break; |
| } |
| default: { |
| throw new IllegalStateException(); |
| } |
| } |
| String aqlPlus = String.format(Locale.US, prepareJoin, tokenizer, tokenizer, simFunction, simThreshold, |
| tokenizer, tokenizer, simFunction, simThreshold, simFunction, simThreshold, simThreshold); |
| |
| LogicalVariable leftPKVar = leftInputPKs.get(0); |
| LogicalVariable rightPKVar = rightInputPKs.get(0); |
| |
| Counter counter = new Counter(context.getVarCounter()); |
| |
| AQLPlusParser parser = new AQLPlusParser(new StringReader(aqlPlus)); |
| parser.initScope(); |
| parser.setVarCounter(counter); |
| List<Clause> clauses; |
| try { |
| clauses = parser.Clauses(); |
| } catch (ParseException e) { |
| throw new AlgebricksException(e); |
| } |
| // The translator will compile metadata internally. Run this compilation |
| // under the same transaction id as the "outer" compilation. |
| AqlPlusExpressionToPlanTranslator translator = new AqlPlusExpressionToPlanTranslator( |
| metadataProvider.getJobTxnId(), metadataProvider, counter, null, null); |
| |
| LogicalOperatorDeepCopyVisitor deepCopyVisitor = new LogicalOperatorDeepCopyVisitor(counter); |
| |
| translator.addOperatorToMetaScope(new Identifier("#LEFT"), leftInputOp); |
| translator.addVariableToMetaScope(new Identifier("$$LEFT"), leftInputVar); |
| translator.addVariableToMetaScope(new Identifier("$$LEFTPK"), leftPKVar); |
| |
| translator.addOperatorToMetaScope(new Identifier("#RIGHT"), rightInputOp); |
| translator.addVariableToMetaScope(new Identifier("$$RIGHT"), rightInputVar); |
| translator.addVariableToMetaScope(new Identifier("$$RIGHTPK"), rightPKVar); |
| |
| translator.addOperatorToMetaScope(new Identifier("#LEFT_1"), deepCopyVisitor.deepCopy(leftInputOp, null)); |
| translator.addVariableToMetaScope(new Identifier("$$LEFT_1"), deepCopyVisitor.varCopy(leftInputVar)); |
| translator.addVariableToMetaScope(new Identifier("$$LEFTPK_1"), deepCopyVisitor.varCopy(leftPKVar)); |
| deepCopyVisitor.updatePrimaryKeys(context); |
| deepCopyVisitor.reset(); |
| |
| // translator.addOperatorToMetaScope(new Identifier("#LEFT_2"), |
| // deepCopyVisitor.deepCopy(leftInputOp, null)); |
| // translator.addVariableToMetaScope(new Identifier("$$LEFT_2"), |
| // deepCopyVisitor.varCopy(leftInputVar)); |
| // translator.addVariableToMetaScope(new Identifier("$$LEFTPK_2"), |
| // deepCopyVisitor.varCopy(leftPKVar)); |
| // deepCopyVisitor.updatePrimaryKeys(context); |
| // deepCopyVisitor.reset(); |
| // |
| // translator.addOperatorToMetaScope(new Identifier("#LEFT_3"), |
| // deepCopyVisitor.deepCopy(leftInputOp, null)); |
| // translator.addVariableToMetaScope(new Identifier("$$LEFT_3"), |
| // deepCopyVisitor.varCopy(leftInputVar)); |
| // translator.addVariableToMetaScope(new Identifier("$$LEFTPK_3"), |
| // deepCopyVisitor.varCopy(leftPKVar)); |
| // deepCopyVisitor.updatePrimaryKeys(context); |
| // deepCopyVisitor.reset(); |
| |
| translator.addOperatorToMetaScope(new Identifier("#RIGHT_1"), deepCopyVisitor.deepCopy(rightInputOp, null)); |
| translator.addVariableToMetaScope(new Identifier("$$RIGHT_1"), deepCopyVisitor.varCopy(rightInputVar)); |
| translator.addVariableToMetaScope(new Identifier("$$RIGHTPK_1"), deepCopyVisitor.varCopy(rightPKVar)); |
| deepCopyVisitor.updatePrimaryKeys(context); |
| deepCopyVisitor.reset(); |
| |
| // TODO pick side to run Stage 1, currently always picks RIGHT side |
| translator.addOperatorToMetaScope(new Identifier("#RIGHT_2"), deepCopyVisitor.deepCopy(rightInputOp, null)); |
| translator.addVariableToMetaScope(new Identifier("$$RIGHT_2"), deepCopyVisitor.varCopy(rightInputVar)); |
| translator.addVariableToMetaScope(new Identifier("$$RIGHTPK_2"), deepCopyVisitor.varCopy(rightPKVar)); |
| deepCopyVisitor.updatePrimaryKeys(context); |
| deepCopyVisitor.reset(); |
| |
| translator.addOperatorToMetaScope(new Identifier("#RIGHT_3"), deepCopyVisitor.deepCopy(rightInputOp, null)); |
| translator.addVariableToMetaScope(new Identifier("$$RIGHT_3"), deepCopyVisitor.varCopy(rightInputVar)); |
| translator.addVariableToMetaScope(new Identifier("$$RIGHTPK_3"), deepCopyVisitor.varCopy(rightPKVar)); |
| deepCopyVisitor.updatePrimaryKeys(context); |
| deepCopyVisitor.reset(); |
| |
| ILogicalPlan plan; |
| try { |
| plan = translator.translate(clauses); |
| } catch (AsterixException e) { |
| throw new AlgebricksException(e); |
| } |
| context.setVarCounter(counter.get()); |
| |
| ILogicalOperator outputOp = plan.getRoots().get(0).getValue(); |
| |
| SelectOperator extraSelect = null; |
| if (getItemExprRef != expRef) { |
| // more than one join condition |
| getItemExprRef.setValue(ConstantExpression.TRUE); |
| switch (joinOp.getJoinKind()) { |
| case INNER: { |
| extraSelect = new SelectOperator(expRef); |
| extraSelect.getInputs().add(new MutableObject<ILogicalOperator>(outputOp)); |
| outputOp = extraSelect; |
| break; |
| } |
| case LEFT_OUTER: { |
| if (((AbstractLogicalOperator) outputOp).getOperatorTag() != LogicalOperatorTag.LEFTOUTERJOIN) { |
| throw new IllegalStateException(); |
| } |
| LeftOuterJoinOperator topJoin = (LeftOuterJoinOperator) outputOp; |
| topJoin.getCondition().setValue(expRef.getValue()); |
| break; |
| } |
| default: { |
| throw new IllegalStateException(); |
| } |
| } |
| } |
| opRef.setValue(outputOp); |
| OperatorPropertiesUtil.typeOpRec(opRef, context); |
| return true; |
| } |
| |
| /** |
| * Look for GET_ITEM function call. |
| */ |
| private Mutable<ILogicalExpression> getSimilarityExpression(Mutable<ILogicalExpression> expRef) { |
| ILogicalExpression exp = expRef.getValue(); |
| if (exp.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) { |
| AbstractFunctionCallExpression funcExpr = (AbstractFunctionCallExpression) exp; |
| if (funcExpr.getFunctionIdentifier().equals(AsterixBuiltinFunctions.GET_ITEM)) { |
| return expRef; |
| } |
| if (funcExpr.getFunctionIdentifier().equals(AlgebricksBuiltinFunctions.AND)) { |
| for (int i = 0; i < 2; i++) { |
| Mutable<ILogicalExpression> expRefRet = getSimilarityExpression(funcExpr.getArguments().get(i)); |
| if (expRefRet != null) { |
| return expRefRet; |
| } |
| } |
| } |
| } |
| return null; |
| } |
| |
| @Override |
| public boolean rewritePre(Mutable<ILogicalOperator> opRef, IOptimizationContext context) throws AlgebricksException { |
| return false; |
| } |
| } |