blob: 5500e50d7f07de965b0586abc570c1a12281db1e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.sql.calcite.rel;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.sql.SqlKind;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.join.JoinType;
import org.apache.druid.sql.calcite.expression.DruidExpression;
import org.apache.druid.sql.calcite.expression.Expressions;
import org.apache.druid.sql.calcite.planner.Calcites;
import org.apache.druid.sql.calcite.table.RowSignatures;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
* DruidRel that uses a {@link JoinDataSource}.
*/
public class DruidJoinQueryRel extends DruidRel<DruidJoinQueryRel>
{
private static final TableDataSource DUMMY_DATA_SOURCE = new TableDataSource("__join__");
private final PartialDruidQuery partialQuery;
private final Join joinRel;
private RelNode left;
private RelNode right;
private DruidJoinQueryRel(
RelOptCluster cluster,
RelTraitSet traitSet,
Join joinRel,
PartialDruidQuery partialQuery,
QueryMaker queryMaker
)
{
super(cluster, traitSet, queryMaker);
this.joinRel = joinRel;
this.left = joinRel.getLeft();
this.right = joinRel.getRight();
this.partialQuery = partialQuery;
}
/**
* Create an instance from a Join that is based on two {@link DruidRel} inputs.
*/
public static DruidJoinQueryRel create(
final Join joinRel,
final QueryMaker queryMaker
)
{
return new DruidJoinQueryRel(
joinRel.getCluster(),
joinRel.getTraitSet(),
joinRel,
PartialDruidQuery.create(joinRel),
queryMaker
);
}
@Override
public PartialDruidQuery getPartialDruidQuery()
{
return partialQuery;
}
@Override
public Sequence<Object[]> runQuery()
{
// runQuery doesn't need to finalize aggregations, because the fact that runQuery is happening suggests this
// is the outermost query and it will actually get run as a native query. Druid's native query layer will
// finalize aggregations for the outermost query even if we don't explicitly ask it to.
final DruidQuery query = toDruidQuery(false);
return getQueryMaker().runQuery(query);
}
@Override
public DruidJoinQueryRel withPartialQuery(final PartialDruidQuery newQueryBuilder)
{
return new DruidJoinQueryRel(
getCluster(),
getTraitSet().plusAll(newQueryBuilder.getRelTraits()),
joinRel,
newQueryBuilder,
getQueryMaker()
);
}
@Override
public DruidQuery toDruidQuery(final boolean finalizeAggregations)
{
final DruidRel<?> leftDruidRel = (DruidRel<?>) left;
final DruidQuery leftQuery = Preconditions.checkNotNull((leftDruidRel).toDruidQuery(false), "leftQuery");
final RowSignature leftSignature = leftQuery.getOutputRowSignature();
final DataSource leftDataSource;
final DruidRel<?> rightDruidRel = (DruidRel<?>) right;
final DruidQuery rightQuery = Preconditions.checkNotNull(rightDruidRel.toDruidQuery(false), "rightQuery");
final RowSignature rightSignature = rightQuery.getOutputRowSignature();
final DataSource rightDataSource;
if (computeLeftRequiresSubquery(leftDruidRel)) {
leftDataSource = new QueryDataSource(leftQuery.getQuery());
} else {
leftDataSource = leftQuery.getDataSource();
}
if (computeRightRequiresSubquery(rightDruidRel)) {
rightDataSource = new QueryDataSource(rightQuery.getQuery());
} else {
rightDataSource = rightQuery.getDataSource();
}
final Pair<String, RowSignature> prefixSignaturePair = computeJoinRowSignature(leftSignature, rightSignature);
// Generate the condition for this join as a Druid expression.
final DruidExpression condition = Expressions.toDruidExpression(
getPlannerContext(),
prefixSignaturePair.rhs,
joinRel.getCondition()
);
// DruidJoinRule should not have created us if "condition" is null. Check defensively anyway, which also
// quiets static code analysis.
if (condition == null) {
throw new CannotBuildQueryException(joinRel, joinRel.getCondition());
}
return partialQuery.build(
JoinDataSource.create(
leftDataSource,
rightDataSource,
prefixSignaturePair.lhs,
condition.getExpression(),
toDruidJoinType(joinRel.getJoinType()),
getPlannerContext().getExprMacroTable()
),
prefixSignaturePair.rhs,
getPlannerContext(),
getCluster().getRexBuilder(),
finalizeAggregations
);
}
@Override
public DruidQuery toDruidQueryForExplaining()
{
return partialQuery.build(
DUMMY_DATA_SOURCE,
RowSignatures.fromRelDataType(
joinRel.getRowType().getFieldNames(),
joinRel.getRowType()
),
getPlannerContext(),
getCluster().getRexBuilder(),
false
);
}
@Override
public DruidJoinQueryRel asDruidConvention()
{
return new DruidJoinQueryRel(
getCluster(),
getTraitSet().replace(DruidConvention.instance()),
joinRel.copy(
joinRel.getTraitSet(),
joinRel.getInputs()
.stream()
.map(input -> RelOptRule.convert(input, DruidConvention.instance()))
.collect(Collectors.toList())
),
partialQuery,
getQueryMaker()
);
}
@Override
public List<RelNode> getInputs()
{
return ImmutableList.of(left, right);
}
@Override
public void replaceInput(int ordinalInParent, RelNode p)
{
joinRel.replaceInput(ordinalInParent, p);
if (ordinalInParent == 0) {
this.left = p;
} else if (ordinalInParent == 1) {
this.right = p;
} else {
throw new IndexOutOfBoundsException(StringUtils.format("Invalid ordinalInParent[%s]", ordinalInParent));
}
}
@Override
public List<RexNode> getChildExps()
{
return ImmutableList.of(joinRel.getCondition());
}
@Override
public RelNode copy(final RelTraitSet traitSet, final List<RelNode> inputs)
{
return new DruidJoinQueryRel(
getCluster(),
traitSet,
joinRel.copy(joinRel.getTraitSet(), inputs),
getPartialDruidQuery(),
getQueryMaker()
);
}
@Override
public Set<String> getDataSourceNames()
{
final Set<String> retVal = new HashSet<>();
retVal.addAll(((DruidRel<?>) left).getDataSourceNames());
retVal.addAll(((DruidRel<?>) right).getDataSourceNames());
return retVal;
}
@Override
public RelWriter explainTerms(RelWriter pw)
{
final String queryString;
final DruidQuery druidQuery = toDruidQueryForExplaining();
try {
queryString = getQueryMaker().getJsonMapper().writeValueAsString(druidQuery.getQuery());
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return joinRel.explainTerms(pw)
.item("query", queryString)
.item("signature", druidQuery.getOutputRowSignature());
}
@Override
protected RelDataType deriveRowType()
{
return partialQuery.getRowType();
}
@Override
public RelOptCost computeSelfCost(final RelOptPlanner planner, final RelMetadataQuery mq)
{
double cost;
if (computeLeftRequiresSubquery(getSomeDruidChild(left))) {
cost = CostEstimates.COST_JOIN_SUBQUERY;
} else {
cost = partialQuery.estimateCost();
}
if (computeRightRequiresSubquery(getSomeDruidChild(right))) {
cost += CostEstimates.COST_JOIN_SUBQUERY;
}
if (joinRel.getCondition().isA(SqlKind.LITERAL) && !joinRel.getCondition().isAlwaysFalse()) {
cost += CostEstimates.COST_JOIN_CROSS;
}
return planner.getCostFactory().makeCost(cost, 0, 0);
}
private static JoinType toDruidJoinType(JoinRelType calciteJoinType)
{
switch (calciteJoinType) {
case LEFT:
return JoinType.LEFT;
case RIGHT:
return JoinType.RIGHT;
case FULL:
return JoinType.FULL;
case INNER:
return JoinType.INNER;
default:
throw new IAE("Cannot handle joinType[%s]", calciteJoinType);
}
}
private static boolean computeLeftRequiresSubquery(final DruidRel<?> left)
{
// Left requires a subquery unless it's a scan or mapping on top of any table or a join.
return !DruidRels.isScanOrMapping(left, true);
}
private static boolean computeRightRequiresSubquery(final DruidRel<?> right)
{
// Right requires a subquery unless it's a scan or mapping on top of a global datasource.
// ideally this would involve JoinableFactory.isDirectlyJoinable to check that the global datasources
// are in fact possibly joinable, but for now isGlobal is coupled to joinability
return !(DruidRels.isScanOrMapping(right, false)
&& DruidRels.druidTableIfLeafRel(right).filter(table -> table.getDataSource().isGlobal()).isPresent());
}
/**
* Returns a Pair of "rightPrefix" (for JoinDataSource) and the signature of rows that will result from
* applying that prefix.
*/
private static Pair<String, RowSignature> computeJoinRowSignature(
final RowSignature leftSignature,
final RowSignature rightSignature
)
{
final RowSignature.Builder signatureBuilder = RowSignature.builder();
for (final String column : leftSignature.getColumnNames()) {
signatureBuilder.add(column, leftSignature.getColumnType(column).orElse(null));
}
// Need to include the "0" since findUnusedPrefixForDigits only guarantees safety for digit-initiated suffixes
final String rightPrefix = Calcites.findUnusedPrefixForDigits("j", leftSignature.getColumnNames()) + "0.";
for (final String column : rightSignature.getColumnNames()) {
signatureBuilder.add(rightPrefix + column, rightSignature.getColumnType(column).orElse(null));
}
return Pair.of(rightPrefix, signatureBuilder.build());
}
private static DruidRel<?> getSomeDruidChild(final RelNode child)
{
if (child instanceof DruidRel) {
return (DruidRel<?>) child;
} else {
final RelSubset subset = (RelSubset) child;
return (DruidRel<?>) Iterables.getFirst(subset.getRels(), null);
}
}
}