| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tajo.engine.planner.physical; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.tajo.catalog.Column; |
| import org.apache.tajo.datum.Datum; |
| import org.apache.tajo.plan.expr.AggregationFunctionCallEval; |
| import org.apache.tajo.plan.function.FunctionContext; |
| import org.apache.tajo.plan.logical.DistinctGroupbyNode; |
| import org.apache.tajo.plan.logical.GroupbyNode; |
| import org.apache.tajo.storage.Tuple; |
| import org.apache.tajo.storage.VTuple; |
| import org.apache.tajo.worker.TaskAttemptContext; |
| |
| import java.io.IOException; |
| import java.util.*; |
| |
| /** |
| * This class adjusts shuffle columns between DistinctGroupbyFirstAggregationExec and |
| * DistinctGroupbyThirdAggregationExec. It shuffled by grouping columns and aggregation columns. Because of the |
| * shuffle, more DistinctGroupbyThirdAggregationExec will execute compare than previous two distinct group by |
| * algorithm. And then, many DistinctGroupbyThirdAggregationExec improve the performance of count distinct query. |
| * |
| * For example, there is a query as follows: |
| * select sum(distinct l_orderkey), l_linenumber, l_returnflag, l_linestatus, l_shipdate, |
| * count(distinct l_partkey), sum(l_orderkey) |
| * from lineitem |
| * group by l_linenumber, l_returnflag, l_linestatus, l_shipdate; |
| * |
| * In this case, execution plan for this operator will set shuffle type as follows: |
| * Incoming: 1 => 2 (type=HASH_SHUFFLE, key=?distinctseq (INT2), default.lineitem.l_linenumber (INT4), |
| * default.lineitem.l_returnflag (TEXT), default.lineitem.l_linestatus (TEXT), default.lineitem.l_shipdate (TEXT), |
| * default.lineitem.l_partkey (INT4), default.lineitem.l_orderkey (INT4), num=32) |
| * |
| * Outgoing: 2 => 3 (type=HASH_SHUFFLE, key=default.lineitem.l_linenumber (INT4), |
| * default.lineitem.l_returnflag (TEXT), default.lineitem.l_linestatus (TEXT), |
| * default.lineitem.l_shipdate (TEXT), num=32) |
| * |
| * For reference, input data and output data results as follows: |
| * |
| * ------------------------------------------------------------------------------------------------------------------- |
| * NodeSequence, l_linenumber, l_returnflag, l_linestatus, l_shipdate, l_partkey for distinct, |
| * l_orderkey for distinct, l_orderkey for nondistinct |
| * ------------------------------------------------------------------------------------------------------------------- |
| * 0, 2, R, F, 1993-11-09, 3, NULL, 3 |
| * 0, 2, N, O, 1996-04-12, 1, NULL, 1 |
| * 0, 1, N, O, 1997-01-28, 2, NULL, 2 |
| * 0, 1, R, F, 1994-02-02, 2, NULL, 3 |
| * 0, 1, N, O, 1996-03-13, 1, NULL, 1 |
| * 1, 2, R, F, 1993-11-09, NULL, 3, NULL |
| * 1, 2, N, O, 1996-04-12, NULL, 1, NULL |
| * 1, 1, N, O, 1997-01-28, NULL, 2, NULL |
| * 1, 1, R, F, 1994-02-02, NULL, 3, NULL |
| * 1, 1, N, O, 1996-03-13, NULL, 1, NULL |
| * |
| */ |
| public class DistinctGroupbySecondAggregationExec extends UnaryPhysicalExec { |
| private static Log LOG = LogFactory.getLog(DistinctGroupbySecondAggregationExec.class); |
| private DistinctGroupbyNode plan; |
| |
| private boolean finished = false; |
| |
| private int numGroupingColumns; |
| private int[][] distinctKeyIndexes; |
| private FunctionContext[] nonDistinctAggrContexts; |
| private AggregationFunctionCallEval[] nonDistinctAggrFunctions; |
| private int nonDistinctAggrTupleStartIndex = -1; |
| |
| // Key tuples may have various lengths. The below two maps are used to cache key tuple instances. |
| // Each map is a mapping of key length to key tuple. |
| private Map<Integer, Tuple> keyTupleMap = new HashMap<Integer, Tuple>(); |
| private Map<Integer, Tuple> prevKeyTupleMap = new HashMap<Integer, Tuple>(); |
| |
| private Tuple prevKeyTuple = null; |
| private Tuple prevTuple = null; |
| private final Tuple outTuple; |
| private int prevSeq = -1; |
| |
| public DistinctGroupbySecondAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, SortExec sortExec) |
| throws IOException { |
| super(context, plan.getInSchema(), plan.getOutSchema(), sortExec); |
| this.plan = plan; |
| outTuple = new VTuple(outSchema.size()); |
| } |
| |
| @Override |
| public void init() throws IOException { |
| super.init(); |
| numGroupingColumns = plan.getGroupingColumns().length; |
| |
| List<GroupbyNode> groupbyNodes = plan.getSubPlans(); |
| |
| // Finding distinct group by column index. |
| Set<Integer> groupingKeyIndexSet = new HashSet<Integer>(); |
| for (Column col: plan.getGroupingColumns()) { |
| int keyIndex; |
| if (col.hasQualifier()) { |
| keyIndex = inSchema.getColumnId(col.getQualifiedName()); |
| } else { |
| keyIndex = inSchema.getColumnIdByName(col.getSimpleName()); |
| } |
| groupingKeyIndexSet.add(keyIndex); |
| } |
| |
| int numDistinct = 0; |
| for (GroupbyNode eachGroupby : groupbyNodes) { |
| if (eachGroupby.isDistinct()) { |
| numDistinct++; |
| } else { |
| nonDistinctAggrFunctions = eachGroupby.getAggFunctions(); |
| if (nonDistinctAggrFunctions != null) { |
| for (AggregationFunctionCallEval eachFunction: nonDistinctAggrFunctions) { |
| eachFunction.bind(context.getEvalContext(), inSchema); |
| eachFunction.setIntermediatePhase(); |
| } |
| nonDistinctAggrContexts = new FunctionContext[nonDistinctAggrFunctions.length]; |
| } |
| } |
| } |
| |
| int index = 0; |
| distinctKeyIndexes = new int[numDistinct][]; |
| for (GroupbyNode eachGroupby : groupbyNodes) { |
| if (eachGroupby.isDistinct()) { |
| List<Integer> distinctGroupingKeyIndex = new ArrayList<Integer>(); |
| Column[] distinctGroupingColumns = eachGroupby.getGroupingColumns(); |
| for (int idx = 0; idx < distinctGroupingColumns.length; idx++) { |
| Column col = distinctGroupingColumns[idx]; |
| int keyIndex; |
| if (col.hasQualifier()) { |
| keyIndex = inSchema.getColumnId(col.getQualifiedName()); |
| } else { |
| keyIndex = inSchema.getColumnIdByName(col.getSimpleName()); |
| } |
| if (!groupingKeyIndexSet.contains(keyIndex)) { |
| distinctGroupingKeyIndex.add(keyIndex); |
| } |
| } |
| int i = 0; |
| distinctKeyIndexes[index] = new int[distinctGroupingKeyIndex.size()]; |
| for (int eachIdx : distinctGroupingKeyIndex) { |
| distinctKeyIndexes[index][i++] = eachIdx; |
| } |
| index++; |
| } |
| } |
| if (nonDistinctAggrFunctions != null) { |
| nonDistinctAggrTupleStartIndex = inSchema.size() - nonDistinctAggrFunctions.length; |
| } |
| } |
| |
| @Override |
| public Tuple next() throws IOException { |
| if (finished) { |
| return null; |
| } |
| |
| while (!context.isStopped()) { |
| Tuple tuple = child.next(); |
| if (tuple == null) { |
| finished = true; |
| |
| if (prevTuple == null) { |
| // Empty case |
| return null; |
| } |
| if (prevSeq == 0 && nonDistinctAggrFunctions != null) { |
| terminatedNonDistinctAggr(prevTuple); |
| } |
| outTuple.put(prevTuple.getValues()); |
| return outTuple; |
| } |
| |
| int distinctSeq = tuple.getInt2(0); |
| Tuple keyTuple = getKeyTuple(distinctSeq, tuple); |
| |
| if (prevKeyTuple == null) { |
| // First |
| if (distinctSeq == 0 && nonDistinctAggrFunctions != null) { |
| initNonDistinctAggrContext(); |
| mergeNonDistinctAggr(tuple); |
| } |
| prevKeyTuple = getKeyTuple(prevKeyTupleMap, keyTuple.getValues()); |
| prevTuple = new VTuple(tuple.getValues()); |
| prevSeq = distinctSeq; |
| continue; |
| } |
| |
| if (!prevKeyTuple.equals(keyTuple)) { |
| // new grouping key |
| if (prevSeq == 0 && nonDistinctAggrFunctions != null) { |
| terminatedNonDistinctAggr(prevTuple); |
| } |
| outTuple.put(prevTuple.getValues()); |
| |
| prevKeyTuple = getKeyTuple(prevKeyTupleMap, keyTuple.getValues()); |
| prevTuple.put(tuple.getValues()); |
| prevSeq = distinctSeq; |
| |
| if (distinctSeq == 0 && nonDistinctAggrFunctions != null) { |
| initNonDistinctAggrContext(); |
| mergeNonDistinctAggr(tuple); |
| } |
| return outTuple; |
| } else { |
| prevKeyTuple = getKeyTuple(prevKeyTupleMap, keyTuple.getValues()); |
| prevTuple.put(tuple.getValues()); |
| prevSeq = distinctSeq; |
| if (distinctSeq == 0 && nonDistinctAggrFunctions != null) { |
| mergeNonDistinctAggr(tuple); |
| } |
| } |
| } |
| |
| return null; |
| } |
| |
| private void initNonDistinctAggrContext() { |
| if (nonDistinctAggrFunctions != null) { |
| nonDistinctAggrContexts = new FunctionContext[nonDistinctAggrFunctions.length]; |
| for (int i = 0; i < nonDistinctAggrFunctions.length; i++) { |
| nonDistinctAggrContexts[i] = nonDistinctAggrFunctions[i].newContext(); |
| } |
| } |
| } |
| |
| private void mergeNonDistinctAggr(Tuple tuple) { |
| if (nonDistinctAggrFunctions == null) { |
| return; |
| } |
| for (int i = 0; i < nonDistinctAggrFunctions.length; i++) { |
| nonDistinctAggrFunctions[i].merge(nonDistinctAggrContexts[i], tuple); |
| } |
| } |
| |
| private void terminatedNonDistinctAggr(Tuple tuple) { |
| if (nonDistinctAggrFunctions == null) { |
| return; |
| } |
| for (int i = 0; i < nonDistinctAggrFunctions.length; i++) { |
| tuple.put(nonDistinctAggrTupleStartIndex + i, nonDistinctAggrFunctions[i].terminate(nonDistinctAggrContexts[i])); |
| } |
| } |
| |
| private Tuple getKeyTuple(int distinctSeq, Tuple tuple) { |
| int[] columnIndexes = distinctKeyIndexes[distinctSeq]; |
| int keyLength = numGroupingColumns + columnIndexes.length + 1; |
| Tuple keyTuple = getKeyTuple(keyTupleMap, keyLength); |
| keyTuple.put(0, tuple.asDatum(0)); |
| for (int i = 0; i < numGroupingColumns; i++) { |
| keyTuple.put(i + 1, tuple.asDatum(i + 1)); |
| } |
| for (int i = 0; i < columnIndexes.length; i++) { |
| keyTuple.put(i + 1 + numGroupingColumns, tuple.asDatum(columnIndexes[i])); |
| } |
| |
| return keyTuple; |
| } |
| |
| private static Tuple getKeyTuple(Map<Integer, Tuple> keyTupleMap, Datum[] values) { |
| Tuple keyTuple = getKeyTuple(keyTupleMap, values.length); |
| keyTuple.put(values); |
| return keyTuple; |
| } |
| |
| private static Tuple getKeyTuple(Map<Integer, Tuple> keyTupleMap, int keyLength) { |
| Tuple keyTuple; |
| if (keyTupleMap.containsKey(keyLength)) { |
| keyTuple = keyTupleMap.get(keyLength); |
| } else { |
| keyTuple = new VTuple(keyLength); |
| keyTupleMap.put(keyLength, keyTuple); |
| } |
| return keyTuple; |
| } |
| |
| @Override |
| public void rescan() throws IOException { |
| super.rescan(); |
| prevKeyTuple = null; |
| prevTuple = null; |
| finished = false; |
| keyTupleMap.clear(); |
| prevKeyTupleMap.clear(); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| super.close(); |
| keyTupleMap.clear(); |
| prevKeyTupleMap.clear(); |
| prevKeyTuple = null; |
| prevTuple = null; |
| } |
| } |