blob: a248d525953cad6d7facbb0beb93bd0ef54f62bd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.engine.planner.physical;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.engine.planner.KeyProjector;
import org.apache.tajo.engine.utils.CacheHolder;
import org.apache.tajo.engine.utils.TableCacheKey;
import org.apache.tajo.exception.TajoInternalError;
import org.apache.tajo.plan.logical.JoinNode;
import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.worker.ExecutionBlockSharedResource;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
/**
* common exec for all hash join execs
*
* @param <T> Tuple collection type to load small relation onto in-memory
*/
public abstract class CommonHashJoinExec<T> extends CommonJoinExec {
// temporal tuples and states for nested loop join
protected boolean first = true;
protected TupleMap<T> tupleSlots;
protected Iterator<Tuple> iterator;
protected final boolean isCrossJoin;
protected final List<Column[]> joinKeyPairs;
protected final int rightNumCols;
protected final int leftNumCols;
protected final Column[] leftKeyList;
protected final Column[] rightKeyList;
protected final KeyProjector leftKeyExtractor;
protected boolean finished;
protected TableStats tableStatsOfCachedRightChild = null;
public CommonHashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec outer, PhysicalExec inner) {
super(context, plan, outer, inner);
switch (plan.getJoinType()) {
case CROSS:
if (hasJoinQual) {
throw new TajoInternalError("Cross join cannot evaluate join conditions.");
} else {
isCrossJoin = true;
joinKeyPairs = null;
rightNumCols = leftNumCols = -1;
leftKeyList = rightKeyList = null;
leftKeyExtractor = null;
}
break;
case INNER:
// Other join types except INNER join can have empty join condition.
if (!hasJoinQual) {
throw new TajoInternalError("Inner join must have any join conditions.");
}
default:
isCrossJoin = false;
// HashJoin only can manage equi join key pairs.
this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, outer.getSchema(),
inner.getSchema(), false);
leftKeyList = new Column[joinKeyPairs.size()];
rightKeyList = new Column[joinKeyPairs.size()];
for (int i = 0; i < joinKeyPairs.size(); i++) {
leftKeyList[i] = outer.getSchema().getColumn(joinKeyPairs.get(i)[0].getQualifiedName());
rightKeyList[i] = inner.getSchema().getColumn(joinKeyPairs.get(i)[1].getQualifiedName());
}
leftNumCols = outer.getSchema().size();
rightNumCols = inner.getSchema().size();
leftKeyExtractor = new KeyProjector(leftSchema, leftKeyList);
break;
}
}
protected void loadRightToHashTable() throws IOException {
ScanExec scanExec = PhysicalPlanUtil.findExecutor(rightChild, ScanExec.class);
if (scanExec.canBroadcast()) {
/* If this table can broadcast, all tasks in a node will share the same cache */
TableCacheKey key = CacheHolder.BroadcastCacheHolder.getCacheKey(context, scanExec);
loadRightFromCache(key);
} else {
this.tupleSlots = convert(buildRightToHashTable(), false);
}
first = false;
}
protected void loadRightFromCache(TableCacheKey key) throws IOException {
ExecutionBlockSharedResource sharedResource = context.getSharedResource();
CacheHolder<TupleMap<TupleList>> holder;
synchronized (sharedResource.getLock()) {
if (sharedResource.hasBroadcastCache(key)) {
holder = sharedResource.getBroadcastCache(key);
} else {
TupleMap<TupleList> built = buildRightToHashTable();
holder = new CacheHolder.BroadcastCacheHolder(built, rightChild.getInputStats(), null);
sharedResource.addBroadcastCache(key, holder);
}
}
this.tableStatsOfCachedRightChild = holder.getTableStats();
this.tupleSlots = convert(holder.getData(), true);
}
protected TupleMap<TupleList> buildRightToHashTable() throws IOException {
if (isCrossJoin) {
return buildRightToHashTableForCrossJoin();
} else {
return buildRightToHashTableForNonCrossJoin();
}
}
protected TupleMap<TupleList> buildRightToHashTableForCrossJoin() throws IOException {
Tuple tuple;
TupleMap<TupleList> map = new TupleMap<>(1);
TupleList tuples = new TupleList();
while (!context.isStopped() && (tuple = rightChild.next()) != null) {
tuples.add(tuple);
}
map.put(null, tuples);
return map;
}
protected TupleMap<TupleList> buildRightToHashTableForNonCrossJoin() throws IOException {
Tuple tuple;
TupleMap<TupleList> map = new TupleMap<TupleList>(100000);
KeyProjector keyProjector = new KeyProjector(rightSchema, rightKeyList);
while (!context.isStopped() && (tuple = rightChild.next()) != null) {
KeyTuple keyTuple = keyProjector.project(tuple);
TupleList newValue = map.get(keyTuple);
if (newValue == null) {
map.put(keyTuple, newValue = new TupleList());
}
// if source is scan or groupby, it needs not to be cloned
newValue.add(tuple);
}
return map;
}
// todo: convert loaded data to cache condition
protected abstract TupleMap<T> convert(TupleMap<TupleList> hashed, boolean fromCache)
throws IOException;
@Override
public void rescan() throws IOException {
super.rescan();
finished = false;
iterator = null;
}
@Override
public void close() throws IOException {
super.close();
iterator = null;
if (tupleSlots != null) {
tupleSlots.clear();
tupleSlots = null;
}
}
@Override
public TableStats getInputStats() {
if (leftChild == null) {
return inputStats;
}
TableStats leftInputStats = leftChild.getInputStats();
inputStats.setNumBytes(0);
inputStats.setReadBytes(0);
inputStats.setNumRows(0);
if (leftInputStats != null) {
inputStats.setNumBytes(leftInputStats.getNumBytes());
inputStats.setReadBytes(leftInputStats.getReadBytes());
inputStats.setNumRows(leftInputStats.getNumRows());
}
TableStats rightInputStats = tableStatsOfCachedRightChild == null ?
rightChild.getInputStats() : tableStatsOfCachedRightChild;
if (rightInputStats != null) {
inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes());
inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes());
inputStats.setNumRows(inputStats.getNumRows() + rightInputStats.getNumRows());
}
return inputStats;
}
}