blob: 9e568ad7a3aa353862984f788c30e9d22a2771e4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.planner;
import java.util.ArrayList;
import java.util.List;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.analysis.BinaryPredicate;
import org.apache.impala.analysis.Expr;
import org.apache.impala.analysis.ExprSubstitutionMap;
import org.apache.impala.analysis.JoinOperator;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.InternalException;
import org.apache.impala.common.Pair;
import org.apache.impala.thrift.TEqJoinCondition;
import org.apache.impala.thrift.TExplainLevel;
import org.apache.impala.thrift.THashJoinNode;
import org.apache.impala.thrift.TPlanNode;
import org.apache.impala.thrift.TPlanNodeType;
import org.apache.impala.thrift.TQueryOptions;
import org.apache.impala.util.BitUtil;
* Hash join between left child (outer) and right child (inner). One child must be the
* plan generated for a table ref. Typically, that is the right child, but due to join
* inversion (for outer/semi/cross joins) it could also be the left child.
public class HashJoinNode extends JoinNode {
public HashJoinNode(PlanNode outer, PlanNode inner, boolean isStraightJoin,
DistributionMode distrMode, JoinOperator joinOp,
List<BinaryPredicate> eqJoinConjuncts, List<Expr> otherJoinConjuncts) {
super(outer, inner, isStraightJoin, distrMode, joinOp, eqJoinConjuncts,
otherJoinConjuncts, "HASH JOIN");
Preconditions.checkState(joinOp_ != JoinOperator.CROSS_JOIN);
public boolean isBlockingJoinNode() { return true; }
public List<BinaryPredicate> getEqJoinConjuncts() { return eqJoinConjuncts_; }
public void init(Analyzer analyzer) throws ImpalaException {
List<BinaryPredicate> newEqJoinConjuncts = new ArrayList<>();
ExprSubstitutionMap combinedChildSmap = getCombinedChildSmap();
for (Expr c: eqJoinConjuncts_) {
BinaryPredicate eqPred =
(BinaryPredicate) c.substitute(combinedChildSmap, analyzer, false);
Type t0 = eqPred.getChild(0).getType();
Type t1 = eqPred.getChild(1).getType();
if (!t0.matchesType(t1)) {
// With decimal and char types, the child types do not have to match because
// the equality builtin handles it. However, they will not hash correctly so
// insert a cast.
boolean bothDecimal = t0.isDecimal() && t1.isDecimal();
boolean bothString = t0.isStringType() && t1.isStringType();
if (!bothDecimal && !bothString) {
throw new InternalException("Cannot compare " +
t0.toSql() + " to " + t1.toSql() + " in join predicate.");
Type compatibleType = Type.getAssignmentCompatibleType(
t0, t1, false, analyzer.isDecimalV2());
if (compatibleType.isInvalid()) {
throw new InternalException(String.format(
"Unable create a hash join with equi-join predicate %s " +
"because the operands cannot be cast without loss of precision. " +
"Operand types: %s = %s.", eqPred.toSql(), t0.toSql(), t1.toSql()));
Preconditions.checkState(compatibleType.isDecimal() ||
try {
if (!t0.equals(compatibleType)) {
eqPred.setChild(0, eqPred.getChild(0).castTo(compatibleType));
if (!t1.equals(compatibleType)) {
eqPred.setChild(1, eqPred.getChild(1).castTo(compatibleType));
} catch (AnalysisException e) {
throw new InternalException("Should not happen", e);
BinaryPredicate newEqPred = new BinaryPredicate(eqPred.getOp(),
eqPred.getChild(0), eqPred.getChild(1));
eqJoinConjuncts_ = newEqJoinConjuncts;
protected String debugString() {
return Objects.toStringHelper(this)
.add("eqJoinConjuncts_", eqJoinConjunctsDebugString())
private String eqJoinConjunctsDebugString() {
Objects.ToStringHelper helper = Objects.toStringHelper(this);
for (Expr entry: eqJoinConjuncts_) {
helper.add("lhs" , entry.getChild(0)).add("rhs", entry.getChild(1));
return helper.toString();
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.HASH_JOIN_NODE;
msg.join_node = joinNodeToThrift();
msg.join_node.hash_join_node = new THashJoinNode();
for (Expr e: otherJoinConjuncts_) {
* Helper to get the equi-join conjuncts in a thrift representation.
public List<TEqJoinCondition> getThriftEquiJoinConjuncts() {
List<TEqJoinCondition> equiJoinConjuncts = new ArrayList<>(eqJoinConjuncts_.size());
for (Expr entry : eqJoinConjuncts_) {
BinaryPredicate bp = (BinaryPredicate)entry;
TEqJoinCondition eqJoinCondition =
new TEqJoinCondition(bp.getChild(0).treeToThrift(),
bp.getOp() == BinaryPredicate.Operator.NOT_DISTINCT);
return equiJoinConjuncts;
protected String getNodeExplainString(String prefix, String detailPrefix,
TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
output.append(String.format("%s%s [%s]\n", prefix, getDisplayLabel(),
if (detailLevel.ordinal() > TExplainLevel.STANDARD.ordinal()) {
if (joinTableId_.isValid()) {
detailPrefix + "hash-table-id=" + joinTableId_.toString() + "\n");
if (detailLevel.ordinal() > TExplainLevel.MINIMAL.ordinal()) {
output.append(detailPrefix + "hash predicates: ");
for (int i = 0; i < eqJoinConjuncts_.size(); ++i) {
Expr eqConjunct = eqJoinConjuncts_.get(i);
if (i + 1 != eqJoinConjuncts_.size()) output.append(", ");
// Optionally print FK/PK equi-join conjuncts.
if (joinOp_.isInnerJoin() || joinOp_.isOuterJoin()) {
if (detailLevel.ordinal() > TExplainLevel.STANDARD.ordinal()) {
output.append(detailPrefix + "fk/pk conjuncts: ");
if (fkPkEqJoinConjuncts_ == null) {
} else if (fkPkEqJoinConjuncts_.isEmpty()) {
output.append("assumed fk/pk");
} else {
output.append(Joiner.on(", ").join(fkPkEqJoinConjuncts_));
if (!otherJoinConjuncts_.isEmpty()) {
output.append(detailPrefix + "other join predicates: ")
.append(Expr.getExplainString(otherJoinConjuncts_, detailLevel) + "\n");
if (!conjuncts_.isEmpty()) {
output.append(detailPrefix + "other predicates: ")
.append(Expr.getExplainString(conjuncts_, detailLevel) + "\n");
if (!runtimeFilters_.isEmpty()) {
output.append(detailPrefix + "runtime filters: ");
output.append(getRuntimeFilterExplainString(true, detailLevel));
return output.toString();
public Pair<ResourceProfile, ResourceProfile> computeJoinResourceProfile(
TQueryOptions queryOptions) {
long perInstanceMemEstimate;
long perInstanceDataBytes;
int numInstances = fragment_.getNumInstances(queryOptions.getMt_dop());
if (getChild(1).getCardinality() == -1 || getChild(1).getAvgRowSize() == -1
|| numInstances <= 0) {
perInstanceMemEstimate = DEFAULT_PER_INSTANCE_MEM;
perInstanceDataBytes = -1;
} else {
// TODO: IMPALA-4224: update this once we can share the broadcast join data between
// finstances. Currently this implicitly assumes that each instance has a copy of
// the hash tables.
perInstanceDataBytes = (long) Math.ceil(getChild(1).cardinality_
* getChild(1).avgRowSize_);
// Assume the rows are evenly divided among instances.
if (distrMode_ == DistributionMode.PARTITIONED) {
perInstanceDataBytes /= numInstances;
perInstanceMemEstimate = (long) Math.ceil(
perInstanceDataBytes * PlannerContext.HASH_TBL_SPACE_OVERHEAD);
// Must be kept in sync with PartitionedHashJoinBuilder::MinReservation() in be.
final int PARTITION_FANOUT = 16;
long minBuildBuffers = PARTITION_FANOUT + 1
+ (joinOp_ == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN ? 1 : 0);
long bufferSize = queryOptions.getDefault_spillable_buffer_size();
if (perInstanceDataBytes != -1) {
long bytesPerBuffer = perInstanceDataBytes / PARTITION_FANOUT;
// Scale down the buffer size if we think there will be excess free space with the
// default buffer size, e.g. if the right side is a small dimension table.
bufferSize = Math.min(bufferSize, Math.max(
// Two of the buffers need to be buffers large enough to hold the maximum-sized row
// to serve as input and output buffers while repartitioning.
long maxRowBufferSize =
computeMaxSpillableBufferSize(bufferSize, queryOptions.getMax_row_size());
long perInstanceBuildMinMemReservation =
bufferSize * (minBuildBuffers - 2) + maxRowBufferSize * 2;
// Most reservation for probe buffers is obtained from the join builder when
// spilling. However, for NAAJ, two additional probe streams are needed that
// are used exclusively by the probe side.
long perInstanceProbeMinMemReservation = 0;
if (joinOp_ == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
// Only one of the NAAJ probe streams is written at a time, so it needs a max-sized
// buffer. If the build is integrated, we already have a max sized buffer accounted
// for in the build reservation.
perInstanceProbeMinMemReservation =
hasSeparateBuild() ? maxRowBufferSize + bufferSize : bufferSize * 2;
// Almost all resource consumption is in the build, or shared between the build and
// the probe. These are accounted for in the build profile.
ResourceProfile probeProfile = new ResourceProfileBuilder()
ResourceProfile buildProfile = new ResourceProfileBuilder()
return Pair.create(probeProfile, buildProfile);