blob: a0a633535dfa58493a8b5a3d9e4a1cbd6d2d68cb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.common;
import java.io.IOException;
import java.util.List;
import com.sun.codemodel.JExpression;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.compile.sig.GeneratorMapping;
import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.ValueVectorReadExpression;
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.join.JoinUtils;
import org.apache.drill.exec.planner.physical.HashPrelUtil;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.vector.ValueVector;
import com.sun.codemodel.JConditional;
import com.sun.codemodel.JExpr;
/**
* This is a master class used to generate code for {@link HashTable}s.
*/
public class ChainedHashTable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ChainedHashTable.class);
private static final GeneratorMapping KEY_MATCH_BUILD =
GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalBuild" /* eval method */,
null /* reset */, null /* cleanup */);
private static final GeneratorMapping BOTH_KEYS_NULL =
GeneratorMapping.create("setupInterior" /* setup method */, "areBothKeysNull" /* eval method */,
null /* reset */, null /* cleanup */);
private static final GeneratorMapping KEY_MATCH_PROBE =
GeneratorMapping.create("setupInterior" /* setup method */, "isKeyMatchInternalProbe" /* eval method */,
null /* reset */, null /* cleanup */);
private static final GeneratorMapping GET_HASH_BUILD =
GeneratorMapping.create("doSetup" /* setup method */, "getHashBuild" /* eval method */,
null /* reset */, null /* cleanup */);
private static final GeneratorMapping GET_HASH_PROBE =
GeneratorMapping.create("doSetup" /* setup method */, "getHashProbe" /* eval method */, null /* reset */,
null /* cleanup */);
private static final GeneratorMapping SET_VALUE =
GeneratorMapping.create("setupInterior" /* setup method */, "setValue" /* eval method */, null /* reset */,
null /* cleanup */);
private static final GeneratorMapping OUTPUT_KEYS =
GeneratorMapping.create("setupInterior" /* setup method */, "outputRecordKeys" /* eval method */,
null /* reset */, null /* cleanup */);
// GM for putting constant expression into method "setupInterior"
private static final GeneratorMapping SETUP_INTERIOR_CONSTANT =
GeneratorMapping.create("setupInterior" /* setup method */, "setupInterior" /* eval method */,
null /* reset */, null /* cleanup */);
// GM for putting constant expression into method "doSetup"
private static final GeneratorMapping DO_SETUP_CONSTANT =
GeneratorMapping.create("doSetup" /* setup method */, "doSetup" /* eval method */, null /* reset */,
null /* cleanup */);
private final MappingSet KeyMatchIncomingBuildMapping =
new MappingSet("incomingRowIdx", null, "incomingBuild", null, SETUP_INTERIOR_CONSTANT, KEY_MATCH_BUILD);
private final MappingSet bothKeysNullIncomingBuildMapping =
new MappingSet("incomingRowIdx", null, "incomingBuild", null, SETUP_INTERIOR_CONSTANT, BOTH_KEYS_NULL);
private final MappingSet KeyMatchIncomingProbeMapping =
new MappingSet("incomingRowIdx", null, "incomingProbe", null, SETUP_INTERIOR_CONSTANT, KEY_MATCH_PROBE);
private final MappingSet KeyMatchHtableMapping =
new MappingSet("htRowIdx", null, "htContainer", null, SETUP_INTERIOR_CONSTANT, KEY_MATCH_BUILD);
private final MappingSet bothKeysNullHtableMapping =
new MappingSet("htRowIdx", null, "htContainer", null, SETUP_INTERIOR_CONSTANT, BOTH_KEYS_NULL);
private final MappingSet KeyMatchHtableProbeMapping =
new MappingSet("htRowIdx", null, "htContainer", null, SETUP_INTERIOR_CONSTANT, KEY_MATCH_PROBE);
private final MappingSet GetHashIncomingBuildMapping =
new MappingSet("incomingRowIdx", null, "incomingBuild", null, DO_SETUP_CONSTANT, GET_HASH_BUILD);
private final MappingSet GetHashIncomingProbeMapping =
new MappingSet("incomingRowIdx", null, "incomingProbe", null, DO_SETUP_CONSTANT, GET_HASH_PROBE);
private final MappingSet SetValueMapping =
new MappingSet("incomingRowIdx" /* read index */, "htRowIdx" /* write index */,
"incomingBuild" /* read container */, "htContainer" /* write container */, SETUP_INTERIOR_CONSTANT,
SET_VALUE);
private final MappingSet OutputRecordKeysMapping =
new MappingSet("htRowIdx" /* read index */, "outRowIdx" /* write index */, "htContainer" /* read container */,
"outgoing" /* write container */, SETUP_INTERIOR_CONSTANT, OUTPUT_KEYS);
private HashTableConfig htConfig;
private final FragmentContext context;
private final BufferAllocator allocator;
private RecordBatch incomingBuild;
private RecordBatch incomingProbe;
private final RecordBatch outgoing;
private enum SetupWork {DO_BUILD, DO_PROBE, CHECK_BOTH_NULLS};
public ChainedHashTable(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator,
RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing) {
this.htConfig = htConfig;
this.context = context;
this.allocator = allocator;
this.incomingBuild = incomingBuild;
this.incomingProbe = incomingProbe;
this.outgoing = outgoing;
}
public void updateIncoming(RecordBatch incomingBuild, RecordBatch incomingProbe) {
this.incomingBuild = incomingBuild;
this.incomingProbe = incomingProbe;
}
public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds) throws ClassTransformationException,
IOException, SchemaChangeException {
CodeGenerator<HashTable> top = CodeGenerator.get(HashTable.TEMPLATE_DEFINITION, context.getOptions());
top.plainJavaCapable(true);
// Uncomment out this line to debug the generated code.
// This code is called from generated code, so to step into this code,
// persist the code generated in HashAggBatch also.
// top.saveCodeForDebugging(true);
top.preferPlainJava(true); // use a subclass
ClassGenerator<HashTable> cg = top.getRoot();
ClassGenerator<HashTable> cgInner = cg.getInnerGenerator("BatchHolder");
LogicalExpression[] keyExprsBuild = new LogicalExpression[htConfig.getKeyExprsBuild().size()];
LogicalExpression[] keyExprsProbe = null;
boolean isProbe = (htConfig.getKeyExprsProbe() != null);
if (isProbe) {
keyExprsProbe = new LogicalExpression[htConfig.getKeyExprsProbe().size()];
}
ErrorCollector collector = new ErrorCollectorImpl();
VectorContainer htContainerOrig = new VectorContainer(); // original ht container from which others may be cloned
TypedFieldId[] htKeyFieldIds = new TypedFieldId[htConfig.getKeyExprsBuild().size()];
int i = 0;
for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingBuild, collector, context.getFunctionRegistry());
if (collector.hasErrors()) {
throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
}
if (expr == null) {
continue;
}
keyExprsBuild[i] = expr;
i++;
}
if (isProbe) {
i = 0;
for (NamedExpression ne : htConfig.getKeyExprsProbe()) {
final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incomingProbe, collector, context.getFunctionRegistry());
if (collector.hasErrors()) {
throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
}
if (expr == null) {
continue;
}
keyExprsProbe[i] = expr;
i++;
}
JoinUtils.addLeastRestrictiveCasts(keyExprsProbe, incomingProbe, keyExprsBuild, incomingBuild, context);
}
i = 0;
/*
* Once the implicit casts have been added, create the value vectors for the corresponding
* type and add it to the hash table's container.
* Note: Adding implicit casts may have a minor impact on the memory foot print. For example
* if we have a join condition with bigint on the probe side and int on the build side then
* after this change we will be allocating a bigint vector in the hashtable instead of an int
* vector.
*/
for (NamedExpression ne : htConfig.getKeyExprsBuild()) {
LogicalExpression expr = keyExprsBuild[i];
final MaterializedField outputField = MaterializedField.create(ne.getRef().getLastSegment().getNameSegment().getPath(),
expr.getMajorType());
ValueVector vv = TypeHelper.getNewVector(outputField, allocator);
htKeyFieldIds[i] = htContainerOrig.add(vv);
i++;
}
// Only in case of a join: Generate a special method to check if both the new key and the existing key (in this HT bucket) are nulls
// (used by Hash-Join to avoid creating a long hash-table chain of null keys, which can lead to useless O(n^2) work on that chain.)
// The logic is: Nulls match on build, and don't match on probe. Note that this logic covers outer joins as well.
setupIsKeyMatchInternal(cgInner, bothKeysNullIncomingBuildMapping, bothKeysNullHtableMapping, keyExprsBuild,
htConfig.getComparators(), htKeyFieldIds, SetupWork.CHECK_BOTH_NULLS);
// generate code for isKeyMatch(), setValue(), getHash() and outputRecordKeys()
setupIsKeyMatchInternal(cgInner, KeyMatchIncomingBuildMapping, KeyMatchHtableMapping, keyExprsBuild,
htConfig.getComparators(), htKeyFieldIds, SetupWork.DO_BUILD);
setupIsKeyMatchInternal(cgInner, KeyMatchIncomingProbeMapping, KeyMatchHtableProbeMapping, keyExprsProbe,
htConfig.getComparators(), htKeyFieldIds, SetupWork.DO_PROBE);
setupSetValue(cgInner, keyExprsBuild, htKeyFieldIds);
if (outgoing != null) {
if (outKeyFieldIds.length > htConfig.getKeyExprsBuild().size()) {
throw new IllegalArgumentException("Mismatched number of output key fields.");
}
}
setupOutputRecordKeys(cgInner, htKeyFieldIds, outKeyFieldIds);
setupGetHash(cg /* use top level code generator for getHash */, GetHashIncomingBuildMapping, incomingBuild, keyExprsBuild);
setupGetHash(cg /* use top level code generator for getHash */, GetHashIncomingProbeMapping, incomingProbe, keyExprsProbe);
HashTable ht = context.getImplementationClass(top);
ht.setup(htConfig, allocator, incomingBuild.getContainer(), incomingProbe, outgoing, htContainerOrig, context, cgInner);
return ht;
}
private void setupIsKeyMatchInternal(ClassGenerator<HashTable> cg, MappingSet incomingMapping, MappingSet htableMapping,
LogicalExpression[] keyExprs, List<Comparator> comparators, TypedFieldId[] htKeyFieldIds, SetupWork work) {
boolean checkIfBothNulls = work == SetupWork.CHECK_BOTH_NULLS;
// Regular key matching may return false in the middle (i.e., some pair of columns did not match), and true only if all matched;
// but "both nulls" check returns the opposite logic (i.e., true when one pair of nulls is found, need check no more)
JExpression midPointResult = checkIfBothNulls ? JExpr.TRUE : JExpr.FALSE;
JExpression finalResult = checkIfBothNulls ? JExpr.FALSE : JExpr.TRUE;
cg.setMappingSet(incomingMapping);
if (keyExprs == null || keyExprs.length == 0 ||
checkIfBothNulls && ! comparators.contains(Comparator.EQUALS)) { // e.g. for Hash-Aggr, or non-equi join
cg.getEvalBlock()._return(JExpr.FALSE);
return;
}
for (int i = 0; i < keyExprs.length; i++) {
final LogicalExpression expr = keyExprs[i];
cg.setMappingSet(incomingMapping);
HoldingContainer left = cg.addExpr(expr, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
cg.setMappingSet(htableMapping);
ValueVectorReadExpression vvrExpr = new ValueVectorReadExpression(htKeyFieldIds[i]);
HoldingContainer right = cg.addExpr(vvrExpr, ClassGenerator.BlkCreateMode.FALSE);
JConditional jc;
if ( work != SetupWork.DO_BUILD ) { // BUILD runs this logic in a separate method - areBothKeysNull()
// codegen for the special case when both columns are null (i.e., return early with midPointResult)
if (comparators.get(i) == Comparator.EQUALS
&& left.isOptional() && right.isOptional()) {
jc = cg.getEvalBlock()._if(left.getIsSet().eq(JExpr.lit(0)).
cand(right.getIsSet().eq(JExpr.lit(0))));
jc._then()._return(midPointResult);
}
}
if ( ! checkIfBothNulls ) { // generate comparison code (at least one of the two columns' values is non-null)
final LogicalExpression f = FunctionGenerationHelper.getOrderingComparatorNullsHigh(left, right, context.getFunctionRegistry());
HoldingContainer out = cg.addExpr(f, ClassGenerator.BlkCreateMode.FALSE);
// check if two values are not equal (comparator result != 0)
jc = cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)));
jc._then()._return(midPointResult);
}
}
// All key expressions compared the same way, so return the appropriate final result
cg.getEvalBlock()._return(finalResult);
}
private void setupSetValue(ClassGenerator<HashTable> cg, LogicalExpression[] keyExprs,
TypedFieldId[] htKeyFieldIds) {
cg.setMappingSet(SetValueMapping);
int i = 0;
for (LogicalExpression expr : keyExprs) {
ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(htKeyFieldIds[i++], expr, true);
cg.addExpr(vvwExpr, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
}
}
private void setupOutputRecordKeys(ClassGenerator<HashTable> cg, TypedFieldId[] htKeyFieldIds, TypedFieldId[] outKeyFieldIds) {
cg.setMappingSet(OutputRecordKeysMapping);
if (outKeyFieldIds != null) {
for (int i = 0; i < outKeyFieldIds.length; i++) {
ValueVectorReadExpression vvrExpr = new ValueVectorReadExpression(htKeyFieldIds[i]);
boolean useSetSafe = !Types.isFixedWidthType(vvrExpr.getMajorType()) || Types.isRepeated(vvrExpr.getMajorType());
ValueVectorWriteExpression vvwExpr = new ValueVectorWriteExpression(outKeyFieldIds[i], vvrExpr, useSetSafe);
cg.addExpr(vvwExpr, ClassGenerator.BlkCreateMode.TRUE);
}
}
}
private void setupGetHash(ClassGenerator<HashTable> cg, MappingSet incomingMapping, VectorAccessible batch, LogicalExpression[] keyExprs)
throws SchemaChangeException {
cg.setMappingSet(incomingMapping);
if (keyExprs == null || keyExprs.length == 0) {
cg.getEvalBlock()._return(JExpr.lit(0));
return;
}
/*
* We use the same logic to generate run time code for the hash function both for hash join and hash
* aggregate. For join we need to hash everything as double (both for distribution and for comparison) but
* for aggregation we can avoid the penalty of casting to double
*/
/*
Generate logical expression for each key so expression can be split into blocks if number of expressions in method exceeds upper limit.
`seedValue` is used as holder to pass generated seed value for the new methods.
*/
String seedValue = "seedValue";
LogicalExpression seed = ValueExpressions.getParameterExpression(seedValue, Types.required(TypeProtos.MinorType.INT));
for (LogicalExpression expr : keyExprs) {
LogicalExpression hashExpression = HashPrelUtil.getHashExpression(expr, seed, incomingProbe != null);
LogicalExpression materializedExpr = ExpressionTreeMaterializer.materializeAndCheckErrors(hashExpression, batch, context.getFunctionRegistry());
HoldingContainer hash = cg.addExpr(materializedExpr, ClassGenerator.BlkCreateMode.TRUE_IF_BOUND);
cg.getEvalBlock().assign(JExpr.ref(seedValue), hash.getValue());
}
cg.getEvalBlock()._return(JExpr.ref(seedValue));
}
}