| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hive.ql.exec; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hive.conf.HiveConf; |
| import org.apache.hadoop.hive.ql.CompilationOpContext; |
| import org.apache.hadoop.hive.ql.exec.persistence.AbstractRowContainer; |
| import org.apache.hadoop.hive.ql.exec.persistence.RowContainer; |
| import org.apache.hadoop.hive.ql.metadata.HiveException; |
| import org.apache.hadoop.hive.ql.plan.JoinCondDesc; |
| import org.apache.hadoop.hive.ql.plan.JoinDesc; |
| import org.apache.hadoop.hive.ql.plan.TableDesc; |
| import org.apache.hadoop.hive.serde2.io.ShortWritable; |
| import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; |
| import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; |
| import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; |
| import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; |
| import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Join operator implementation. |
| */ |
| public abstract class CommonJoinOperator<T extends JoinDesc> extends |
| Operator<T> implements Serializable { |
| private static final long serialVersionUID = 1L; |
| protected static final Logger LOG = LoggerFactory.getLogger(CommonJoinOperator.class |
| .getName()); |
| |
| protected transient int numAliases; // number of aliases |
| /** |
| * The expressions for join inputs. |
| */ |
| protected transient List<ExprNodeEvaluator>[] joinValues; |
| |
| /** |
| * The filters for join |
| */ |
| protected transient List<ExprNodeEvaluator>[] joinFilters; |
| |
| protected transient int[][] filterMaps; |
| |
| /** |
| * The ObjectInspectors for the join inputs. |
| */ |
| protected transient List<ObjectInspector>[] joinValuesObjectInspectors; |
| |
| /** |
| * The ObjectInspectors for join filters. |
| */ |
| protected transient List<ObjectInspector>[] joinFilterObjectInspectors; |
| /** |
| * The standard ObjectInspectors for the join inputs. |
| */ |
| protected transient List<ObjectInspector>[] joinValuesStandardObjectInspectors; |
| /** |
| * The standard ObjectInspectors for the row container. |
| */ |
| protected transient List<ObjectInspector>[] rowContainerStandardObjectInspectors; |
| |
| protected transient Byte[] order; // order in which the results should |
| // be output |
| protected transient JoinCondDesc[] condn; |
| protected transient boolean[] nullsafes; |
| |
| public transient boolean noOuterJoin; |
| |
| // for outer joins, contains the potential nulls for the concerned aliases |
| protected transient ArrayList<Object>[] dummyObj; |
| |
| // empty rows for each table |
| protected transient RowContainer<List<Object>>[] dummyObjVectors; |
| |
| protected transient int totalSz; // total size of the composite object |
| |
| // keys are the column names. basically this maps the position of the column |
| // in |
| // the output of the CommonJoinOperator to the input columnInfo. |
| private transient Map<Integer, Set<String>> posToAliasMap; |
| |
| transient LazyBinarySerDe[] spillTableSerDe; |
| protected transient TableDesc[] spillTableDesc; // spill tables are |
| // used if the join |
| // input is too large |
| // to fit in memory |
| |
| AbstractRowContainer<List<Object>>[] storage; // map b/w table alias |
| // to RowContainer |
| int joinEmitInterval = -1; |
| int joinCacheSize = 0; |
| long nextSz = 0; |
| transient Byte lastAlias = null; |
| |
| transient boolean handleSkewJoin = false; |
| |
| transient boolean hasLeftSemiJoin = false; |
| |
| protected transient int countAfterReport; |
| protected transient int heartbeatInterval; |
| protected static final int NOTSKIPBIGTABLE = -1; |
| |
| /** Kryo ctor. */ |
| protected CommonJoinOperator() { |
| super(); |
| } |
| |
| public CommonJoinOperator(CompilationOpContext ctx) { |
| super(ctx); |
| } |
| |
| public CommonJoinOperator(CommonJoinOperator<T> clone) { |
| super(clone.id, clone.cContext); |
| this.joinEmitInterval = clone.joinEmitInterval; |
| this.joinCacheSize = clone.joinCacheSize; |
| this.nextSz = clone.nextSz; |
| this.childOperators = clone.childOperators; |
| this.parentOperators = clone.parentOperators; |
| this.done = clone.done; |
| this.storage = clone.storage; |
| this.condn = clone.condn; |
| this.conf = clone.getConf(); |
| this.setSchema(clone.getSchema()); |
| this.alias = clone.alias; |
| this.childOperatorsArray = clone.childOperatorsArray; |
| this.childOperatorsTag = clone.childOperatorsTag; |
| this.colExprMap = clone.colExprMap; |
| this.dummyObj = clone.dummyObj; |
| this.dummyObjVectors = clone.dummyObjVectors; |
| this.forwardCache = clone.forwardCache; |
| this.groupKeyObject = clone.groupKeyObject; |
| this.handleSkewJoin = clone.handleSkewJoin; |
| this.hconf = clone.hconf; |
| this.inputObjInspectors = clone.inputObjInspectors; |
| this.noOuterJoin = clone.noOuterJoin; |
| this.numAliases = clone.numAliases; |
| this.operatorId = clone.operatorId; |
| this.posToAliasMap = clone.posToAliasMap; |
| this.spillTableDesc = clone.spillTableDesc; |
| this.statsMap = clone.statsMap; |
| this.joinFilters = clone.joinFilters; |
| this.joinFilterObjectInspectors = clone.joinFilterObjectInspectors; |
| } |
| |
| private <T extends JoinDesc> ObjectInspector getJoinOutputObjectInspector( |
| Byte[] order, List<ObjectInspector>[] aliasToObjectInspectors, T conf) { |
| List<ObjectInspector> structFieldObjectInspectors = new ArrayList<ObjectInspector>(); |
| for (Byte alias : order) { |
| List<ObjectInspector> oiList = getValueObjectInspectors(alias, aliasToObjectInspectors); |
| if (oiList != null && !oiList.isEmpty()) { |
| structFieldObjectInspectors.addAll(oiList); |
| } |
| } |
| |
| StructObjectInspector joinOutputObjectInspector = ObjectInspectorFactory |
| .getStandardStructObjectInspector(conf.getOutputColumnNames(), |
| structFieldObjectInspectors); |
| return joinOutputObjectInspector; |
| } |
| |
| protected List<ObjectInspector> getValueObjectInspectors( |
| byte alias, List<ObjectInspector>[] aliasToObjectInspectors) { |
| return aliasToObjectInspectors[alias]; |
| } |
| |
| protected Configuration hconf; |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| protected void initializeOp(Configuration hconf) throws HiveException { |
| super.initializeOp(hconf); |
| this.handleSkewJoin = conf.getHandleSkewJoin(); |
| this.hconf = hconf; |
| |
| heartbeatInterval = HiveConf.getIntVar(hconf, |
| HiveConf.ConfVars.HIVESENDHEARTBEAT); |
| countAfterReport = 0; |
| |
| totalSz = 0; |
| |
| int tagLen = conf.getTagLength(); |
| // Map that contains the rows for each alias |
| storage = new AbstractRowContainer[tagLen]; |
| |
| numAliases = conf.getExprs().size(); |
| |
| joinValues = new List[tagLen]; |
| |
| joinFilters = new List[tagLen]; |
| |
| order = conf.getTagOrder(); |
| condn = conf.getConds(); |
| nullsafes = conf.getNullSafes(); |
| noOuterJoin = conf.isNoOuterJoin(); |
| |
| totalSz = JoinUtil.populateJoinKeyValue(joinValues, conf.getExprs(), |
| order,NOTSKIPBIGTABLE); |
| |
| //process join filters |
| joinFilters = new List[tagLen]; |
| JoinUtil.populateJoinKeyValue(joinFilters, conf.getFilters(),order,NOTSKIPBIGTABLE); |
| |
| |
| joinValuesObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinValues, |
| inputObjInspectors,NOTSKIPBIGTABLE, tagLen); |
| joinFilterObjectInspectors = JoinUtil.getObjectInspectorsFromEvaluators(joinFilters, |
| inputObjInspectors,NOTSKIPBIGTABLE, tagLen); |
| joinValuesStandardObjectInspectors = JoinUtil.getStandardObjectInspectors( |
| joinValuesObjectInspectors,NOTSKIPBIGTABLE, tagLen); |
| |
| filterMaps = conf.getFilterMap(); |
| |
| if (noOuterJoin) { |
| rowContainerStandardObjectInspectors = joinValuesStandardObjectInspectors; |
| } else { |
| List<ObjectInspector>[] rowContainerObjectInspectors = new List[tagLen]; |
| for (Byte alias : order) { |
| ArrayList<ObjectInspector> rcOIs = new ArrayList<ObjectInspector>(); |
| rcOIs.addAll(joinValuesObjectInspectors[alias]); |
| // for each alias, add object inspector for short as the last element |
| rcOIs.add( |
| PrimitiveObjectInspectorFactory.writableShortObjectInspector); |
| rowContainerObjectInspectors[alias] = rcOIs; |
| } |
| rowContainerStandardObjectInspectors = |
| JoinUtil.getStandardObjectInspectors(rowContainerObjectInspectors,NOTSKIPBIGTABLE, tagLen); |
| } |
| |
| dummyObj = new ArrayList[numAliases]; |
| dummyObjVectors = new RowContainer[numAliases]; |
| |
| joinEmitInterval = HiveConf.getIntVar(hconf, |
| HiveConf.ConfVars.HIVEJOINEMITINTERVAL); |
| joinCacheSize = HiveConf.getIntVar(hconf, |
| HiveConf.ConfVars.HIVEJOINCACHESIZE); |
| |
| // construct dummy null row (indicating empty table) and |
| // construct spill table serde which is used if input is too |
| // large to fit into main memory. |
| byte pos = 0; |
| for (Byte alias : order) { |
| int sz = conf.getExprs().get(alias).size(); |
| ArrayList<Object> nr = new ArrayList<Object>(sz); |
| |
| for (int j = 0; j < sz; j++) { |
| nr.add(null); |
| } |
| |
| if (!noOuterJoin) { |
| // add whether the row is filtered or not |
| // this value does not matter for the dummyObj |
| // because the join values are already null |
| nr.add(new ShortWritable()); |
| } |
| dummyObj[pos] = nr; |
| // there should be only 1 dummy object in the RowContainer |
| RowContainer<List<Object>> values = JoinUtil.getRowContainer(hconf, |
| rowContainerStandardObjectInspectors[pos], |
| alias, 1, spillTableDesc, conf, !hasFilter(pos), reporter); |
| |
| values.addRow(dummyObj[pos]); |
| dummyObjVectors[pos] = values; |
| |
| // if serde is null, the input doesn't need to be spilled out |
| // e.g., the output columns does not contains the input table |
| RowContainer<List<Object>> rc = JoinUtil.getRowContainer(hconf, |
| rowContainerStandardObjectInspectors[pos], |
| alias, joinCacheSize, spillTableDesc, conf, !hasFilter(pos), reporter); |
| storage[pos] = rc; |
| |
| pos++; |
| } |
| |
| forwardCache = new Object[totalSz]; |
| aliasFilterTags = new short[numAliases]; |
| Arrays.fill(aliasFilterTags, (byte)0xff); |
| |
| filterTags = new short[numAliases]; |
| skipVectors = new boolean[numAliases][]; |
| for(int i = 0; i < skipVectors.length; i++) { |
| skipVectors[i] = new boolean[i + 1]; |
| } |
| intermediate = new List[numAliases]; |
| |
| offsets = new int[numAliases + 1]; |
| int sum = 0; |
| for (int i = 0; i < numAliases; i++) { |
| offsets[i] = sum; |
| sum += joinValues[order[i]].size(); |
| } |
| offsets[numAliases] = sum; |
| |
| outputObjInspector = getJoinOutputObjectInspector(order, |
| joinValuesStandardObjectInspectors, conf); |
| |
| for( int i = 0; i < condn.length; i++ ) { |
| if(condn[i].getType() == JoinDesc.LEFT_SEMI_JOIN) { |
| hasLeftSemiJoin = true; |
| } |
| } |
| |
| if (isLogInfoEnabled) { |
| LOG.info("JOIN " + outputObjInspector.getTypeName() + " totalsz = " + totalSz); |
| } |
| } |
| |
| transient boolean newGroupStarted = false; |
| |
| @Override |
| public void startGroup() throws HiveException { |
| newGroupStarted = true; |
| for (AbstractRowContainer<List<Object>> alw : storage) { |
| alw.clearRows(); |
| } |
| super.startGroup(); |
| } |
| |
| protected long getNextSize(long sz) { |
| // A very simple counter to keep track of join entries for a key |
| if (sz >= 100000) { |
| return sz + 100000; |
| } |
| |
| return 2 * sz; |
| } |
| |
| protected transient Byte alias; |
| protected transient Object[] forwardCache; |
| |
| // pre-calculated offset values for each alias |
| protected transient int[] offsets; |
| |
| // a array of bitvectors where each entry denotes whether the element is to |
| // be used or not (whether it is null or not). The size of the bitvector is |
| // same as the number of inputs(aliases) under consideration currently. |
| // When all inputs are accounted for, the output is forwarded appropriately. |
| protected transient boolean[][] skipVectors; |
| |
| // caches objects before constructing forward cache |
| protected transient List[] intermediate; |
| |
| // filter tags for objects |
| protected transient short[] filterTags; |
| |
| /** |
| * On filterTags |
| * |
| * ANDed value of all filter tags in current join group |
| * if any of values passes on outer join alias (which makes zero for the tag alias), |
| * it means there exists a pair for it and safely regarded as a inner join |
| * |
| * for example, with table a, b something like, |
| * a = 100, 10 | 100, 20 | 100, 30 |
| * b = 100, 10 | 100, 20 | 100, 30 |
| * |
| * the query "a FO b ON a.k=b.k AND a.v>10 AND b.v>30" makes filter map |
| * 0(a) = [1(b),1] : a.v>10 |
| * 1(b) = [0(a),1] : b.v>30 |
| * |
| * for filtered rows in a (100,10) create a-NULL |
| * for filtered rows in b (100,10) (100,20) (100,30) create NULL-b |
| * |
| * with 0(a) = [1(b),1] : a.v>10 |
| * 100, 10 = 00000010 (filtered) |
| * 100, 20 = 00000000 (valid) |
| * 100, 30 = 00000000 (valid) |
| * ------------------------- |
| * sum = 00000000 : for valid rows in b, there is at least one pair in a |
| * |
| * with 1(b) = [0(a),1] : b.v>30 |
| * 100, 10 = 00000001 (filtered) |
| * 100, 20 = 00000001 (filtered) |
| * 100, 30 = 00000001 (filtered) |
| * ------------------------- |
| * sum = 00000001 : for valid rows in a (100,20) (100,30), there is no pair in b |
| * |
| * result : |
| * 100, 10 : N, N |
| * N, N : 100, 10 |
| * N, N : 100, 20 |
| * N, N : 100, 30 |
| * 100, 20 : N, N |
| * 100, 30 : N, N |
| */ |
| protected transient short[] aliasFilterTags; |
| |
| // all evaluation should be processed here for valid aliasFilterTags |
| // |
| // for MapJoin, filter tag is pre-calculated in MapredLocalTask and stored with value. |
| // when reading the hashtable, MapJoinObjectValue calculates alias filter and provide it to join |
| protected List<Object> getFilteredValue(byte alias, Object row) throws HiveException { |
| boolean hasFilter = hasFilter(alias); |
| List<Object> nr = JoinUtil.computeValues(row, joinValues[alias], |
| joinValuesObjectInspectors[alias], hasFilter); |
| if (hasFilter) { |
| short filterTag = JoinUtil.isFiltered(row, joinFilters[alias], |
| joinFilterObjectInspectors[alias], filterMaps[alias]); |
| nr.add(new ShortWritable(filterTag)); |
| aliasFilterTags[alias] &= filterTag; |
| } |
| return nr; |
| } |
| |
| // fill forwardCache with skipvector |
| private void createForwardJoinObject(boolean[] skip) throws HiveException { |
| Arrays.fill(forwardCache, null); |
| |
| boolean forward = false; |
| for (int i = 0; i < numAliases; i++) { |
| if (!skip[i]) { |
| for (int j = offsets[i]; j < offsets[i + 1]; j++) { |
| forwardCache[j] = intermediate[i].get(j - offsets[i]); |
| } |
| forward = true; |
| } |
| } |
| if (forward) { |
| internalForward(forwardCache, outputObjInspector); |
| countAfterReport = 0; |
| } |
| } |
| |
| // entry point (aliasNum = 0) |
| private void genJoinObject() throws HiveException { |
| boolean rightFirst = true; |
| boolean hasFilter = hasFilter(order[0]); |
| AbstractRowContainer.RowIterator<List<Object>> iter = storage[order[0]].rowIter(); |
| for (List<Object> rightObj = iter.first(); rightObj != null; rightObj = iter.next()) { |
| boolean rightNull = rightObj == dummyObj[0]; |
| if (hasFilter) { |
| filterTags[0] = getFilterTag(rightObj); |
| } |
| skipVectors[0][0] = rightNull; |
| intermediate[0] = rightObj; |
| |
| genObject(1, rightFirst, rightNull); |
| rightFirst = false; |
| } |
| } |
| |
| // creates objects in recursive manner |
| private void genObject(int aliasNum, boolean allLeftFirst, boolean allLeftNull) |
| throws HiveException { |
| if (aliasNum < numAliases) { |
| |
| boolean[] skip = skipVectors[aliasNum]; |
| boolean[] prevSkip = skipVectors[aliasNum - 1]; |
| |
| JoinCondDesc joinCond = condn[aliasNum - 1]; |
| int type = joinCond.getType(); |
| int left = joinCond.getLeft(); |
| int right = joinCond.getRight(); |
| |
| // search for match in the rhs table |
| AbstractRowContainer<List<Object>> aliasRes = storage[order[aliasNum]]; |
| |
| boolean done = false; |
| boolean loopAgain = false; |
| boolean tryLOForFO = type == JoinDesc.FULL_OUTER_JOIN; |
| |
| boolean rightFirst = true; |
| AbstractRowContainer.RowIterator<List<Object>> iter = aliasRes.rowIter(); |
| for (List<Object> rightObj = iter.first(); !done && rightObj != null; |
| rightObj = loopAgain ? rightObj : iter.next(), rightFirst = loopAgain = false) { |
| System.arraycopy(prevSkip, 0, skip, 0, prevSkip.length); |
| |
| boolean rightNull = rightObj == dummyObj[aliasNum]; |
| if (hasFilter(order[aliasNum])) { |
| filterTags[aliasNum] = getFilterTag(rightObj); |
| } |
| skip[right] = rightNull; |
| |
| if (type == JoinDesc.INNER_JOIN) { |
| innerJoin(skip, left, right); |
| } else if (type == JoinDesc.LEFT_SEMI_JOIN) { |
| if (innerJoin(skip, left, right)) { |
| // if left-semi-join found a match, skipping the rest of the rows in the |
| // rhs table of the semijoin |
| done = true; |
| } |
| } else if (type == JoinDesc.LEFT_OUTER_JOIN || |
| (type == JoinDesc.FULL_OUTER_JOIN && rightNull)) { |
| int result = leftOuterJoin(skip, left, right); |
| if (result < 0) { |
| continue; |
| } |
| done = result > 0; |
| } else if (type == JoinDesc.RIGHT_OUTER_JOIN || |
| (type == JoinDesc.FULL_OUTER_JOIN && allLeftNull)) { |
| if (allLeftFirst && !rightOuterJoin(skip, left, right) || |
| !allLeftFirst && !innerJoin(skip, left, right)) { |
| continue; |
| } |
| } else if (type == JoinDesc.FULL_OUTER_JOIN) { |
| if (tryLOForFO && leftOuterJoin(skip, left, right) > 0) { |
| loopAgain = allLeftFirst; |
| done = !loopAgain; |
| tryLOForFO = false; |
| } else if (allLeftFirst && !rightOuterJoin(skip, left, right) || |
| !allLeftFirst && !innerJoin(skip, left, right)) { |
| continue; |
| } |
| } |
| intermediate[aliasNum] = rightObj; |
| |
| // recursively call the join the other rhs tables |
| genObject(aliasNum + 1, allLeftFirst && rightFirst, allLeftNull && rightNull); |
| } |
| } else if (!allLeftNull) { |
| createForwardJoinObject(skipVectors[numAliases - 1]); |
| } |
| } |
| |
| // inner join |
| private boolean innerJoin(boolean[] skip, int left, int right) { |
| if (!isInnerJoin(skip, left, right)) { |
| Arrays.fill(skip, true); |
| return false; |
| } |
| return true; |
| } |
| |
| // LO |
| // |
| // LEFT\RIGHT skip filtered valid |
| // skip --(1) --(1) --(1) |
| // filtered +-(1) +-(1) +-(1) |
| // valid +-(1) +-(4*) ++(2) |
| // |
| // * If right alias has any pair for left alias, continue (3) |
| // -1 for continue : has pair but not in this turn |
| // 0 for inner join (++) : join and continue LO |
| // 1 for left outer join (+-) : join and skip further LO |
| private int leftOuterJoin(boolean[] skip, int left, int right) { |
| if (skip[left] || skip[right] || !isLeftValid(left, right)) { |
| skip[right] = true; |
| return 1; // case 1 |
| } |
| if (isRightValid(left, right)) { |
| return 0; // case 2 |
| } |
| if (hasRightPairForLeft(left, right)) { |
| return -1; // case 3 |
| } |
| skip[right] = true; |
| return 1; // case 4 |
| } |
| |
| // RO |
| // |
| // LEFT\RIGHT skip filtered valid |
| // skip --(1) -+(1) -+(1) |
| // filtered --(1) -+(1) -+(4*) |
| // valid --(1) -+(1) ++(2) |
| // |
| // * If left alias has any pair for right alias, continue (3) |
| // false for continue : has pair but not in this turn |
| private boolean rightOuterJoin(boolean[] skip, int left, int right) { |
| if (skip[left] || skip[right] || !isRightValid(left, right)) { |
| Arrays.fill(skip, 0, right, true); |
| return true; // case 1 |
| } |
| if (isLeftValid(left, right)) { |
| return true; // case 2 |
| } |
| if (hasLeftPairForRight(left, right)) { |
| return false; // case 3 |
| } |
| Arrays.fill(skip, 0, right, true); |
| return true; // case 4 |
| } |
| |
| // If left and right aliases are all valid, two values will be inner joined, |
| private boolean isInnerJoin(boolean[] skip, int left, int right) { |
| return !skip[left] && !skip[right] && |
| isLeftValid(left, right) && isRightValid(left, right); |
| } |
| |
| // check if left is valid |
| private boolean isLeftValid(int left, int right) { |
| return !hasFilter(left) || !JoinUtil.isFiltered(filterTags[left], right); |
| } |
| |
| // check if right is valid |
| private boolean isRightValid(int left, int right) { |
| return !hasFilter(right) || !JoinUtil.isFiltered(filterTags[right], left); |
| } |
| |
| // check if any left pair exists for right objects |
| private boolean hasLeftPairForRight(int left, int right) { |
| return !JoinUtil.isFiltered(aliasFilterTags[left], right); |
| } |
| |
| // check if any right pair exists for left objects |
| private boolean hasRightPairForLeft(int left, int right) { |
| return !JoinUtil.isFiltered(aliasFilterTags[right], left); |
| } |
| |
| private boolean hasAnyFiltered(int alias, List<Object> row) { |
| return row == dummyObj[alias] || hasFilter(alias) && JoinUtil.hasAnyFiltered(getFilterTag(row)); |
| } |
| |
| protected final boolean hasFilter(int alias) { |
| return filterMaps != null && filterMaps[alias] != null; |
| } |
| |
| // get tag value from object (last of list) |
| protected final short getFilterTag(List<Object> row) { |
| return ((ShortWritable) row.get(row.size() - 1)).get(); |
| } |
| |
| /** |
| * Forward a record of join results. |
| * |
| * @throws HiveException |
| */ |
| @Override |
| public void endGroup() throws HiveException { |
| checkAndGenObject(); |
| } |
| |
| protected void internalForward(Object row, ObjectInspector outputOI) throws HiveException { |
| forward(row, outputOI); |
| } |
| |
| private void genUniqueJoinObject(int aliasNum, int forwardCachePos) |
| throws HiveException { |
| AbstractRowContainer.RowIterator<List<Object>> iter = storage[order[aliasNum]].rowIter(); |
| for (List<Object> row = iter.first(); row != null; row = iter.next()) { |
| reportProgress(); |
| int sz = joinValues[order[aliasNum]].size(); |
| int p = forwardCachePos; |
| for (int j = 0; j < sz; j++) { |
| forwardCache[p++] = row.get(j); |
| } |
| if (aliasNum == numAliases - 1) { |
| internalForward(forwardCache, outputObjInspector); |
| countAfterReport = 0; |
| } else { |
| genUniqueJoinObject(aliasNum + 1, p); |
| } |
| } |
| } |
| |
| private void genAllOneUniqueJoinObject() |
| throws HiveException { |
| int p = 0; |
| for (int i = 0; i < numAliases; i++) { |
| int sz = joinValues[order[i]].size(); |
| List<Object> obj = storage[order[i]].rowIter().first(); |
| for (int j = 0; j < sz; j++) { |
| forwardCache[p++] = obj.get(j); |
| } |
| } |
| |
| internalForward(forwardCache, outputObjInspector); |
| countAfterReport = 0; |
| } |
| |
| protected void checkAndGenObject() throws HiveException { |
| if (condn[0].getType() == JoinDesc.UNIQUE_JOIN) { |
| |
| // Check if results need to be emitted. |
| // Results only need to be emitted if there is a non-null entry in a table |
| // that is preserved or if there are no non-null entries |
| boolean preserve = false; // Will be true if there is a non-null entry |
| // in a preserved table |
| boolean hasNulls = false; // Will be true if there are null entries |
| boolean allOne = true; |
| for (int i = 0; i < numAliases; i++) { |
| Byte alias = order[i]; |
| AbstractRowContainer<List<Object>> alw = storage[alias]; |
| |
| if (!alw.isSingleRow()) { |
| allOne = false; |
| } |
| |
| if (!alw.hasRows()) { |
| alw.addRow(dummyObj[i]); |
| hasNulls = true; |
| } else if (condn[i].getPreserved()) { |
| preserve = true; |
| } |
| } |
| |
| if (hasNulls && !preserve) { |
| return; |
| } |
| |
| if (allOne) { |
| genAllOneUniqueJoinObject(); |
| } else { |
| genUniqueJoinObject(0, 0); |
| } |
| } else { |
| // does any result need to be emitted |
| boolean mayHasMoreThanOne = false; |
| boolean hasEmpty = false; |
| for (int i = 0; i < numAliases; i++) { |
| Byte alias = order[i]; |
| AbstractRowContainer<List<Object>> alw = storage[alias]; |
| |
| if (noOuterJoin) { |
| if (!alw.hasRows()) { |
| return; |
| } else if (!alw.isSingleRow()) { |
| mayHasMoreThanOne = true; |
| } |
| } else { |
| if (!alw.hasRows()) { |
| hasEmpty = true; |
| alw.addRow(dummyObj[i]); |
| } else if (!hasEmpty && alw.isSingleRow()) { |
| if (hasAnyFiltered(alias, alw.rowIter().first())) { |
| hasEmpty = true; |
| } |
| } else { |
| mayHasMoreThanOne = true; |
| if (!hasEmpty) { |
| AbstractRowContainer.RowIterator<List<Object>> iter = alw.rowIter(); |
| for (List<Object> row = iter.first(); row != null; row = iter.next()) { |
| reportProgress(); |
| if (hasAnyFiltered(alias, row)) { |
| hasEmpty = true; |
| break; |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| if (!hasEmpty && !mayHasMoreThanOne) { |
| genAllOneUniqueJoinObject(); |
| } else if (!hasEmpty && !hasLeftSemiJoin) { |
| genUniqueJoinObject(0, 0); |
| } else { |
| genJoinObject(); |
| } |
| } |
| Arrays.fill(aliasFilterTags, (byte)0xff); |
| } |
| |
| protected void reportProgress() { |
| // Send some status periodically |
| countAfterReport++; |
| |
| if ((countAfterReport % heartbeatInterval) == 0 |
| && (reporter != null)) { |
| reporter.progress(); |
| countAfterReport = 0; |
| } |
| } |
| |
| /** |
| * All done. |
| * |
| */ |
| @Override |
| public void closeOp(boolean abort) throws HiveException { |
| for (AbstractRowContainer<List<Object>> alw : storage) { |
| if (alw != null) { |
| alw.clearRows(); // clean up the temp files |
| } |
| } |
| Arrays.fill(storage, null); |
| } |
| |
| @Override |
| public String getName() { |
| return CommonJoinOperator.getOperatorName(); |
| } |
| |
| static public String getOperatorName() { |
| return "JOIN"; |
| } |
| |
| /** |
| * @return the posToAliasMap |
| */ |
| public Map<Integer, Set<String>> getPosToAliasMap() { |
| return posToAliasMap; |
| } |
| |
| /** |
| * @param posToAliasMap |
| * the posToAliasMap to set |
| */ |
| public void setPosToAliasMap(Map<Integer, Set<String>> posToAliasMap) { |
| this.posToAliasMap = posToAliasMap; |
| } |
| |
| @Override |
| public boolean opAllowedBeforeMapJoin() { |
| return false; |
| } |
| |
| @Override |
| public boolean opAllowedAfterMapJoin() { |
| return false; |
| } |
| } |