blob: fb70aea93608bcee508ecc7224c4babd6e0c9b57 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.optimizer.rules.util;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.algebra.operators.physical.IntervalIndexJoinPOperator;
import org.apache.asterix.algebra.operators.physical.IntervalPartitionJoinPOperator;
import org.apache.asterix.common.annotations.IntervalJoinExpressionAnnotation;
import org.apache.asterix.lang.common.util.FunctionUtil;
import org.apache.asterix.om.base.AInt32;
import org.apache.asterix.om.constants.AsterixConstantValue;
import org.apache.asterix.om.functions.AsterixBuiltinFunctions;
import org.apache.asterix.runtime.operators.joins.AfterIntervalMergeJoinCheckerFactory;
import org.apache.asterix.runtime.operators.joins.BeforeIntervalMergeJoinCheckerFactory;
import org.apache.asterix.runtime.operators.joins.CoveredByIntervalMergeJoinCheckerFactory;
import org.apache.asterix.runtime.operators.joins.CoversIntervalMergeJoinCheckerFactory;
import org.apache.asterix.runtime.operators.joins.EndedByIntervalMergeJoinCheckerFactory;
import org.apache.asterix.runtime.operators.joins.EndsIntervalMergeJoinCheckerFactory;
import org.apache.asterix.runtime.operators.joins.IIntervalMergeJoinCheckerFactory;
import org.apache.asterix.runtime.operators.joins.MeetsIntervalMergeJoinCheckerFactory;
import org.apache.asterix.runtime.operators.joins.MetByIntervalMergeJoinCheckerFactory;
import org.apache.asterix.runtime.operators.joins.OverlappedByIntervalMergeJoinCheckerFactory;
import org.apache.asterix.runtime.operators.joins.OverlappingIntervalMergeJoinCheckerFactory;
import org.apache.asterix.runtime.operators.joins.OverlapsIntervalMergeJoinCheckerFactory;
import org.apache.asterix.runtime.operators.joins.StartedByIntervalMergeJoinCheckerFactory;
import org.apache.asterix.runtime.operators.joins.StartsIntervalMergeJoinCheckerFactory;
import org.apache.asterix.runtime.operators.joins.intervalpartition.IntervalPartitionUtil;
import org.apache.commons.lang3.mutable.Mutable;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator;
import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag;
import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionAnnotation;
import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractBinaryJoinOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.AssignOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.logical.RangeForwardOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AbstractJoinPOperator.JoinPartitioningType;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.AssignPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.MergeJoinPOperator;
import org.apache.hyracks.algebricks.core.algebra.operators.physical.RangeForwardPOperator;
import org.apache.hyracks.api.dataflow.value.IRangeMap;
import org.apache.hyracks.dataflow.std.base.RangeId;
import org.apache.hyracks.dataflow.std.join.IMergeJoinCheckerFactory;
public class JoinUtils {
private static final int LEFT = 0;
private static final int RIGHT = 1;
private static final Logger LOGGER = Logger.getLogger(JoinUtils.class.getName());
private static final Map<FunctionIdentifier, FunctionIdentifier> INTERVAL_JOIN_CONDITIONS = new HashMap<>();
static {
INTERVAL_JOIN_CONDITIONS.put(AsterixBuiltinFunctions.INTERVAL_AFTER, AsterixBuiltinFunctions.INTERVAL_BEFORE);
INTERVAL_JOIN_CONDITIONS.put(AsterixBuiltinFunctions.INTERVAL_BEFORE, AsterixBuiltinFunctions.INTERVAL_AFTER);
INTERVAL_JOIN_CONDITIONS.put(AsterixBuiltinFunctions.INTERVAL_COVERED_BY,
AsterixBuiltinFunctions.INTERVAL_COVERS);
INTERVAL_JOIN_CONDITIONS.put(AsterixBuiltinFunctions.INTERVAL_COVERS,
AsterixBuiltinFunctions.INTERVAL_COVERED_BY);
INTERVAL_JOIN_CONDITIONS.put(AsterixBuiltinFunctions.INTERVAL_ENDED_BY, AsterixBuiltinFunctions.INTERVAL_ENDS);
INTERVAL_JOIN_CONDITIONS.put(AsterixBuiltinFunctions.INTERVAL_ENDS, AsterixBuiltinFunctions.INTERVAL_ENDED_BY);
INTERVAL_JOIN_CONDITIONS.put(AsterixBuiltinFunctions.INTERVAL_MEETS, AsterixBuiltinFunctions.INTERVAL_MET_BY);
INTERVAL_JOIN_CONDITIONS.put(AsterixBuiltinFunctions.INTERVAL_MET_BY, AsterixBuiltinFunctions.INTERVAL_MEETS);
INTERVAL_JOIN_CONDITIONS.put(AsterixBuiltinFunctions.INTERVAL_OVERLAPPED_BY,
AsterixBuiltinFunctions.INTERVAL_OVERLAPS);
INTERVAL_JOIN_CONDITIONS.put(AsterixBuiltinFunctions.INTERVAL_OVERLAPPING,
AsterixBuiltinFunctions.INTERVAL_OVERLAPPING);
INTERVAL_JOIN_CONDITIONS.put(AsterixBuiltinFunctions.INTERVAL_OVERLAPS,
AsterixBuiltinFunctions.INTERVAL_OVERLAPPED_BY);
INTERVAL_JOIN_CONDITIONS.put(AsterixBuiltinFunctions.INTERVAL_STARTED_BY,
AsterixBuiltinFunctions.INTERVAL_STARTS);
INTERVAL_JOIN_CONDITIONS.put(AsterixBuiltinFunctions.INTERVAL_STARTS,
AsterixBuiltinFunctions.INTERVAL_STARTED_BY);
}
private JoinUtils() {
}
public static void setJoinAlgorithmAndExchangeAlgo(AbstractBinaryJoinOperator op, IOptimizationContext context)
throws AlgebricksException {
ILogicalExpression conditionLE = op.getCondition().getValue();
if (conditionLE.getExpressionTag() != LogicalExpressionTag.FUNCTION_CALL) {
return;
}
List<LogicalVariable> sideLeft = new LinkedList<>();
List<LogicalVariable> sideRight = new LinkedList<>();
List<LogicalVariable> varsLeft = op.getInputs().get(LEFT).getValue().getSchema();
List<LogicalVariable> varsRight = op.getInputs().get(RIGHT).getValue().getSchema();
AbstractFunctionCallExpression fexp = (AbstractFunctionCallExpression) conditionLE;
FunctionIdentifier fi = isIntervalJoinCondition(fexp, varsLeft, varsRight, sideLeft, sideRight);
if (fi != null) {
IntervalJoinExpressionAnnotation ijea = getIntervalJoinAnnotation(fexp);
if (ijea == null) {
// Use default join method.
return;
}
if (ijea.isMergeJoin()) {
// Sort Merge.
LOGGER.fine("Interval Join - Merge");
setSortMergeIntervalJoinOp(op, fi, sideLeft, sideRight, ijea, context);
} else if (ijea.isPartitionJoin()) {
// Overlapping Interval Partition.
LOGGER.fine("Interval Join - Cluster Parititioning");
setIntervalPartitionJoinOp(op, fi, sideLeft, sideRight, ijea, context);
} else if (ijea.isSpatialJoin()) {
// Spatial Partition.
LOGGER.fine("Interval Join - Spatial Partitioning");
} else if (ijea.isIndexJoin()) {
// Endpoint Index.
LOGGER.fine("Interval Join - Endpoint Index");
setIntervalIndexJoinOp(op, fi, sideLeft, sideRight, ijea, context);
}
}
}
private static IntervalJoinExpressionAnnotation getIntervalJoinAnnotation(AbstractFunctionCallExpression fexp) {
Iterator<IExpressionAnnotation> annotationIter = fexp.getAnnotations().values().iterator();
while (annotationIter.hasNext()) {
IExpressionAnnotation annotation = annotationIter.next();
if (annotation instanceof IntervalJoinExpressionAnnotation) {
return (IntervalJoinExpressionAnnotation) annotation;
}
}
return null;
}
private static void setSortMergeIntervalJoinOp(AbstractBinaryJoinOperator op, FunctionIdentifier fi,
List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IntervalJoinExpressionAnnotation ijea,
IOptimizationContext context) throws AlgebricksException {
RangeId leftRangeId = context.newRangeId();
RangeId rightRangeId = context.newRangeId();
insertRangeForward(op, LEFT, leftRangeId, ijea.getRangeMap(), context);
insertRangeForward(op, RIGHT, rightRangeId, ijea.getRangeMap(), context);
IMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi, leftRangeId);
op.setPhysicalOperator(new MergeJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST, sideLeft,
sideRight, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), mjcf, leftRangeId,
rightRangeId, ijea.getRangeMap()));
}
private static void setIntervalPartitionJoinOp(AbstractBinaryJoinOperator op, FunctionIdentifier fi,
List<LogicalVariable> leftKeys, List<LogicalVariable> rightKeys, IntervalJoinExpressionAnnotation ijea,
IOptimizationContext context) throws AlgebricksException {
long leftCount = ijea.getLeftRecordCount() > 0 ? ijea.getLeftRecordCount() : getCardinality(leftKeys, context);
long rightCount = ijea.getRightRecordCount() > 0 ? ijea.getRightRecordCount()
: getCardinality(rightKeys, context);
long leftMaxDuration = ijea.getLeftMaxDuration() > 0 ? ijea.getLeftMaxDuration()
: getMaxDuration(leftKeys, context);
long rightMaxDuration = ijea.getRightMaxDuration() > 0 ? ijea.getRightMaxDuration()
: getMaxDuration(rightKeys, context);
int tuplesPerFrame = ijea.getTuplesPerFrame() > 0 ? ijea.getTuplesPerFrame()
: context.getPhysicalOptimizationConfig().getMaxRecordsPerFrame();
int k = IntervalPartitionUtil.determineK(leftCount, leftMaxDuration, rightCount, rightMaxDuration,
tuplesPerFrame);
if (k <= 2) {
k = 3;
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("IntervalPartitionJoin has overridden the suggested value of k (" + k + ") with 3.");
}
}
RangeId leftRangeId = context.newRangeId();
RangeId rightRangeId = context.newRangeId();
insertRangeForward(op, LEFT, leftRangeId, ijea.getRangeMap(), context);
insertRangeForward(op, RIGHT, rightRangeId, ijea.getRangeMap(), context);
List<LogicalVariable> leftPartitionVar = Arrays.asList(context.newVar(), context.newVar());
List<LogicalVariable> rightPartitionVar = Arrays.asList(context.newVar(), context.newVar());
insertPartitionSortKey(op, LEFT, leftPartitionVar, leftKeys.get(0), leftRangeId, k, context);
insertPartitionSortKey(op, RIGHT, rightPartitionVar, rightKeys.get(0), rightRangeId, k, context);
IIntervalMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi, leftRangeId);
op.setPhysicalOperator(new IntervalPartitionJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
leftKeys, rightKeys, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), k, mjcf,
leftPartitionVar, rightPartitionVar, leftRangeId, rightRangeId, ijea.getRangeMap()));
}
private static void insertRangeForward(AbstractBinaryJoinOperator op, int branch, RangeId rangeId,
IRangeMap rangeMap, IOptimizationContext context) throws AlgebricksException {
RangeForwardOperator rfo = new RangeForwardOperator(rangeId, rangeMap);
rfo.setExecutionMode(op.getExecutionMode());
rfo.getInputs().add(op.getInputs().get(branch));
RangeForwardPOperator rfpo = new RangeForwardPOperator(rangeId, rangeMap);
rfo.setPhysicalOperator(rfpo);
Mutable<ILogicalOperator> rfoRef = new MutableObject<>(rfo);
op.getInputs().set(branch, rfoRef);
context.computeAndSetTypeEnvironmentForOperator(rfo);
}
private static void insertPartitionSortKey(AbstractBinaryJoinOperator op, int branch,
List<LogicalVariable> partitionVars, LogicalVariable intervalVar, RangeId rangeId, int k,
IOptimizationContext context) throws AlgebricksException {
MutableObject<ILogicalExpression> intervalExp = new MutableObject<>(
new VariableReferenceExpression(intervalVar));
MutableObject<ILogicalExpression> rangeIdConstant = new MutableObject<>(
new ConstantExpression(new AsterixConstantValue(new AInt32(rangeId.getId()))));
MutableObject<ILogicalExpression> kConstant = new MutableObject<>(
new ConstantExpression(new AsterixConstantValue(new AInt32(k))));
List<Mutable<ILogicalExpression>> assignExps = new ArrayList<>();
// Start partition
IFunctionInfo startFi = FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.INTERVAL_PARTITION_JOIN_START);
@SuppressWarnings("unchecked")
ScalarFunctionCallExpression startPartitionExp = new ScalarFunctionCallExpression(startFi, intervalExp,
rangeIdConstant, kConstant);
assignExps.add(new MutableObject<ILogicalExpression>(startPartitionExp));
// End partition
IFunctionInfo endFi = FunctionUtil.getFunctionInfo(AsterixBuiltinFunctions.INTERVAL_PARTITION_JOIN_END);
@SuppressWarnings("unchecked")
ScalarFunctionCallExpression endPartitionExp = new ScalarFunctionCallExpression(endFi, intervalExp,
rangeIdConstant, kConstant);
assignExps.add(new MutableObject<ILogicalExpression>(endPartitionExp));
AssignOperator ao = new AssignOperator(partitionVars, assignExps);
ao.setExecutionMode(op.getExecutionMode());
AssignPOperator apo = new AssignPOperator();
ao.setPhysicalOperator(apo);
Mutable<ILogicalOperator> aoRef = new MutableObject<>(ao);
ao.getInputs().add(op.getInputs().get(branch));
op.getInputs().set(branch, aoRef);
context.computeAndSetTypeEnvironmentForOperator(ao);
}
private static void setIntervalIndexJoinOp(AbstractBinaryJoinOperator op, FunctionIdentifier fi,
List<LogicalVariable> sideLeft, List<LogicalVariable> sideRight, IntervalJoinExpressionAnnotation ijea,
IOptimizationContext context) throws AlgebricksException {
RangeId leftRangeId = context.newRangeId();
RangeId rightRangeId = context.newRangeId();
insertRangeForward(op, LEFT, leftRangeId, ijea.getRangeMap(), context);
insertRangeForward(op, RIGHT, rightRangeId, ijea.getRangeMap(), context);
IIntervalMergeJoinCheckerFactory mjcf = getIntervalMergeJoinCheckerFactory(fi, leftRangeId);
op.setPhysicalOperator(new IntervalIndexJoinPOperator(op.getJoinKind(), JoinPartitioningType.BROADCAST,
sideLeft, sideRight, context.getPhysicalOptimizationConfig().getMaxFramesForJoin(), mjcf, leftRangeId,
rightRangeId, ijea.getRangeMap()));
}
private static int getMaxDuration(List<LogicalVariable> lv, IOptimizationContext context) {
// TODO Base on real statistics
return context.getPhysicalOptimizationConfig().getMaxIntervalDuration();
}
private static int getCardinality(List<LogicalVariable> lv, IOptimizationContext context) {
// TODO Base on real statistics
return context.getPhysicalOptimizationConfig().getMaxFramesForJoinLeftInput();
}
private static FunctionIdentifier isIntervalJoinCondition(ILogicalExpression e,
Collection<LogicalVariable> inLeftAll, Collection<LogicalVariable> inRightAll,
Collection<LogicalVariable> outLeftFields, Collection<LogicalVariable> outRightFields) {
FunctionIdentifier fiReturn;
boolean switchArguments = false;
if (e.getExpressionTag() == LogicalExpressionTag.FUNCTION_CALL) {
AbstractFunctionCallExpression fexp = (AbstractFunctionCallExpression) e;
FunctionIdentifier fi = fexp.getFunctionIdentifier();
if (isIntervalFunction(fi)) {
fiReturn = fi;
} else {
return null;
}
ILogicalExpression opLeft = fexp.getArguments().get(LEFT).getValue();
ILogicalExpression opRight = fexp.getArguments().get(RIGHT).getValue();
if (opLeft.getExpressionTag() != LogicalExpressionTag.VARIABLE
|| opRight.getExpressionTag() != LogicalExpressionTag.VARIABLE) {
return null;
}
LogicalVariable var1 = ((VariableReferenceExpression) opLeft).getVariableReference();
if (inLeftAll.contains(var1) && !outLeftFields.contains(var1)) {
outLeftFields.add(var1);
} else if (inRightAll.contains(var1) && !outRightFields.contains(var1)) {
outRightFields.add(var1);
fiReturn = reverseIntervalExpression(fi);
switchArguments = true;
} else {
return null;
}
LogicalVariable var2 = ((VariableReferenceExpression) opRight).getVariableReference();
if (inLeftAll.contains(var2) && !outLeftFields.contains(var2) && switchArguments) {
outLeftFields.add(var2);
} else if (inRightAll.contains(var2) && !outRightFields.contains(var2) && !switchArguments) {
outRightFields.add(var2);
} else {
return null;
}
return fiReturn;
} else {
return null;
}
}
private static IIntervalMergeJoinCheckerFactory getIntervalMergeJoinCheckerFactory(FunctionIdentifier fi,
RangeId rangeId) {
IIntervalMergeJoinCheckerFactory mjcf = new OverlappingIntervalMergeJoinCheckerFactory(rangeId);
if (fi.equals(AsterixBuiltinFunctions.INTERVAL_OVERLAPPED_BY)) {
mjcf = new OverlappedByIntervalMergeJoinCheckerFactory();
} else if (fi.equals(AsterixBuiltinFunctions.INTERVAL_OVERLAPS)) {
mjcf = new OverlapsIntervalMergeJoinCheckerFactory();
} else if (fi.equals(AsterixBuiltinFunctions.INTERVAL_COVERS)) {
mjcf = new CoversIntervalMergeJoinCheckerFactory();
} else if (fi.equals(AsterixBuiltinFunctions.INTERVAL_COVERED_BY)) {
mjcf = new CoveredByIntervalMergeJoinCheckerFactory();
} else if (fi.equals(AsterixBuiltinFunctions.INTERVAL_STARTS)) {
mjcf = new StartsIntervalMergeJoinCheckerFactory();
} else if (fi.equals(AsterixBuiltinFunctions.INTERVAL_STARTED_BY)) {
mjcf = new StartedByIntervalMergeJoinCheckerFactory();
} else if (fi.equals(AsterixBuiltinFunctions.INTERVAL_ENDS)) {
mjcf = new EndsIntervalMergeJoinCheckerFactory();
} else if (fi.equals(AsterixBuiltinFunctions.INTERVAL_ENDED_BY)) {
mjcf = new EndedByIntervalMergeJoinCheckerFactory();
} else if (fi.equals(AsterixBuiltinFunctions.INTERVAL_MEETS)) {
mjcf = new MeetsIntervalMergeJoinCheckerFactory();
} else if (fi.equals(AsterixBuiltinFunctions.INTERVAL_MET_BY)) {
mjcf = new MetByIntervalMergeJoinCheckerFactory();
} else if (fi.equals(AsterixBuiltinFunctions.INTERVAL_BEFORE)) {
mjcf = new BeforeIntervalMergeJoinCheckerFactory();
} else if (fi.equals(AsterixBuiltinFunctions.INTERVAL_AFTER)) {
mjcf = new AfterIntervalMergeJoinCheckerFactory();
}
return mjcf;
}
private static boolean isIntervalFunction(FunctionIdentifier fi) {
return INTERVAL_JOIN_CONDITIONS.containsKey(fi);
}
private static FunctionIdentifier reverseIntervalExpression(FunctionIdentifier fi) {
if (INTERVAL_JOIN_CONDITIONS.containsKey(fi)) {
return INTERVAL_JOIN_CONDITIONS.get(fi);
}
return null;
}
}