blob: 4f819ada26a8d13f114a16fd17a72f26eb36f6a6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.engine.planner.physical;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SchemaUtil;
import org.apache.tajo.engine.planner.Projector;
import org.apache.tajo.plan.expr.AlgebraicUtil;
import org.apache.tajo.plan.expr.BinaryEval;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.expr.EvalTreeUtil;
import org.apache.tajo.plan.logical.JoinNode;
import org.apache.tajo.storage.FrameTuple;
import org.apache.tajo.storage.NullTuple;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
/**
* common exec for all join execs
*/
public abstract class CommonJoinExec extends BinaryPhysicalExec {
// from logical plan
protected JoinNode plan;
protected final boolean hasJoinQual;
protected EvalNode joinQual; // ex) a.id = b.id
protected EvalNode leftJoinFilter; // ex) a > 10
protected EvalNode rightJoinFilter; // ex) b > 5
protected final Schema leftSchema;
protected final Schema rightSchema;
protected final FrameTuple frameTuple;
// projection
protected Projector projector;
public CommonJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) {
super(context, SchemaUtil.merge(outer.getSchema(), inner.getSchema()),
plan.getOutSchema(), outer, inner);
this.plan = plan;
this.leftSchema = outer.getSchema();
this.rightSchema = inner.getSchema();
if (plan.hasJoinQual()) {
EvalNode[] extracted = extractJoinConditions(plan.getJoinQual(), leftSchema, rightSchema);
joinQual = extracted[0];
leftJoinFilter = extracted[1];
rightJoinFilter = extracted[2];
}
this.hasJoinQual = joinQual != null;
// for projection
this.projector = new Projector(context, inSchema, outSchema, plan.getTargets());
// for join
this.frameTuple = new FrameTuple();
}
/**
* It separates a singular CNF-formed join condition into a join condition, a left join filter, and
* right join filter.
*
* @param joinQual the original join condition
* @param leftSchema Left table schema
* @param rightSchema Left table schema
* @return Three element EvalNodes, 0 - join condition, 1 - left join filter, 2 - right join filter.
*/
private EvalNode[] extractJoinConditions(EvalNode joinQual, Schema leftSchema, Schema rightSchema) {
List<EvalNode> joinQuals = Lists.newArrayList();
List<EvalNode> leftFilters = Lists.newArrayList();
List<EvalNode> rightFilters = Lists.newArrayList();
for (EvalNode eachQual : AlgebraicUtil.toConjunctiveNormalFormArray(joinQual)) {
if (!(eachQual instanceof BinaryEval)) {
continue; // todo 'between', etc.
}
BinaryEval binaryEval = (BinaryEval)eachQual;
LinkedHashSet<Column> leftColumns = EvalTreeUtil.findUniqueColumns(binaryEval.getLeftExpr());
LinkedHashSet<Column> rightColumns = EvalTreeUtil.findUniqueColumns(binaryEval.getRightExpr());
boolean leftInLeft = leftSchema.containsAny(leftColumns);
boolean rightInLeft = leftSchema.containsAny(rightColumns);
boolean leftInRight = rightSchema.containsAny(leftColumns);
boolean rightInRight = rightSchema.containsAny(rightColumns);
boolean columnsFromLeft = leftInLeft || rightInLeft;
boolean columnsFromRight = leftInRight || rightInRight;
if (!columnsFromLeft && !columnsFromRight) {
continue; // todo constant expression : this should be done in logical phase
}
if (columnsFromLeft ^ columnsFromRight) {
if (columnsFromLeft) {
leftFilters.add(eachQual);
} else {
rightFilters.add(eachQual);
}
continue;
}
if ((leftInLeft && rightInLeft) || (leftInRight && rightInRight)) {
continue; // todo not allowed yet : this should be checked in logical phase
}
joinQuals.add(eachQual);
}
return new EvalNode[] {
joinQuals.isEmpty() ? null : AlgebraicUtil.createSingletonExprFromCNF(joinQuals),
leftFilters.isEmpty() ? null : AlgebraicUtil.createSingletonExprFromCNF(leftFilters),
rightFilters.isEmpty() ? null : AlgebraicUtil.createSingletonExprFromCNF(rightFilters)
};
}
public JoinNode getPlan() {
return plan;
}
/**
* Evaluate an input tuple with a left join filter
*
* @param left Tuple to be evaluated
* @return True if an input tuple is matched to the left join filter
*/
protected boolean leftFiltered(Tuple left) {
return leftJoinFilter != null && !leftJoinFilter.eval(left).asBool();
}
/**
* Evaluate an input tuple with a right join filter
*
* @param right Tuple to be evaluated
* @return True if an input tuple is matched to the right join filter
*/
protected boolean rightFiltered(Tuple right) {
return rightJoinFilter != null && !rightJoinFilter.eval(right).asBool();
}
/**
* Return an tuple iterator filters rows in a right table by using a join filter.
* It must takes rows of a right table.
*
* @param rightTuples Tuple iterator
* @return rows Filtered by a join filter on right table.
*/
protected Iterator<Tuple> rightFiltered(Iterable<Tuple> rightTuples) {
if (rightTuples == null) {
return Iterators.emptyIterator();
}
if (rightJoinFilter == null) {
return rightTuples.iterator();
}
return Iterators.filter(rightTuples.iterator(), new Predicate<Tuple>() {
@Override
public boolean apply(Tuple input) {
return rightJoinFilter.eval(input).asBool();
}
});
}
/**
* Create a list that contains a single null tuple.
*
* @param width the width of null tuple which will be created.
* @return created list of a null tuple
*/
protected List<Tuple> nullTupleList(int width) {
return Arrays.asList(NullTuple.create(width));
}
@Override
public void init() throws IOException {
super.init();
if (hasJoinQual) {
joinQual.bind(context.getEvalContext(), inSchema);
}
if (leftJoinFilter != null) {
leftJoinFilter.bind(context.getEvalContext(), leftSchema);
}
if (rightJoinFilter != null) {
rightJoinFilter.bind(context.getEvalContext(), rightSchema);
}
}
@Override
protected void compile() {
if (hasJoinQual) {
joinQual = context.getPrecompiledEval(inSchema, joinQual);
}
// compile filters?
}
@Override
public void close() throws IOException {
super.close();
plan = null;
joinQual = null;
leftJoinFilter = null;
rightJoinFilter = null;
projector = null;
}
@Override
public String toString() {
return getClass().getSimpleName() + " [" + leftSchema + " : " + rightSchema + "]";
}
}