blob: 0cfc15fd8bd7e4f74593d05ddbb95fe625bedf7d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.engine.planner.physical;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.plan.Target;
import org.apache.tajo.plan.expr.AggregationFunctionCallEval;
import org.apache.tajo.plan.function.FunctionContext;
import org.apache.tajo.plan.logical.DistinctGroupbyNode;
import org.apache.tajo.plan.logical.GroupbyNode;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.*;
/**
* This class aggregates the output of DistinctGroupbySecondAggregationExec.
*
*/
public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec {
private static Log LOG = LogFactory.getLog(DistinctGroupbyThirdAggregationExec.class);
private DistinctGroupbyNode plan;
private boolean finished = false;
private DistinctFinalAggregator[] aggregators;
private DistinctFinalAggregator nonDistinctAggr;
private int resultTupleLength;
private int numGroupingColumns;
private int[] resultTupleIndexes;
private Tuple outTuple;
private Tuple keyTuple;
private Tuple prevKeyTuple = null;
private Tuple prevTuple = null;
public DistinctGroupbyThirdAggregationExec(TaskAttemptContext context, DistinctGroupbyNode plan, SortExec sortExec)
throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema(), sortExec);
this.plan = plan;
}
@Override
public void init() throws IOException {
super.init();
numGroupingColumns = plan.getGroupingColumns().length;
resultTupleLength = numGroupingColumns;
keyTuple = new VTuple(numGroupingColumns);
List<GroupbyNode> groupbyNodes = plan.getSubPlans();
List<DistinctFinalAggregator> aggregatorList = new ArrayList<>();
int inTupleIndex = 1 + numGroupingColumns;
int outTupleIndex = numGroupingColumns;
int distinctSeq = 0;
for (GroupbyNode eachGroupby : groupbyNodes) {
if (eachGroupby.isDistinct()) {
aggregatorList.add(new DistinctFinalAggregator(distinctSeq, inTupleIndex, outTupleIndex, eachGroupby));
distinctSeq++;
Column[] distinctGroupingColumns = eachGroupby.getGroupingColumns();
inTupleIndex += distinctGroupingColumns.length;
outTupleIndex += eachGroupby.getAggFunctions().size();
} else {
nonDistinctAggr = new DistinctFinalAggregator(-1, inTupleIndex, outTupleIndex, eachGroupby);
outTupleIndex += eachGroupby.getAggFunctions().size();
}
resultTupleLength += eachGroupby.getAggFunctions().size();
}
aggregators = aggregatorList.toArray(new DistinctFinalAggregator[aggregatorList.size()]);
outTuple = new VTuple(resultTupleLength);
// make output schema mapping index
resultTupleIndexes = new int[outSchema.size()];
Map<Column, Integer> groupbyResultTupleIndex = new HashMap<>();
int resultTupleIndex = 0;
for (Column eachColumn: plan.getGroupingColumns()) {
groupbyResultTupleIndex.put(eachColumn, resultTupleIndex);
resultTupleIndex++;
}
for (GroupbyNode eachGroupby : groupbyNodes) {
Set<Column> groupingColumnSet = new HashSet<>();
Collections.addAll(groupingColumnSet, eachGroupby.getGroupingColumns());
for (Target eachTarget: eachGroupby.getTargets()) {
if (!groupingColumnSet.contains(eachTarget.getNamedColumn())) {
//aggr function
groupbyResultTupleIndex.put(eachTarget.getNamedColumn(), resultTupleIndex);
resultTupleIndex++;
}
}
}
int index = 0;
for (Column eachOutputColumn: outSchema.getRootColumns()) {
// If column is avg aggregation function, outschema's column type is float
// but groupbyResultTupleIndex's column type is protobuf
int matchedIndex = -1;
for (Map.Entry<Column, Integer> entry: groupbyResultTupleIndex.entrySet()) {
if (entry.getKey().getQualifiedName().equals(eachOutputColumn.getQualifiedName())) {
matchedIndex = entry.getValue();
break;
}
}
if (matchedIndex < 0) {
throw new IOException("Can't find proper output column mapping: " + eachOutputColumn);
}
resultTupleIndexes[matchedIndex] = index++;
}
}
@Override
public Tuple next() throws IOException {
if (finished) {
return null;
}
while (!context.isStopped()) {
Tuple tuple = child.next();
// Last tuple
if (tuple == null) {
finished = true;
if (prevTuple == null) {
// Empty case
if (numGroupingColumns == 0) {
// No grouping column, return null tuple
return makeEmptyTuple();
} else {
return null;
}
}
for (int i = 0; i < numGroupingColumns; i++) {
outTuple.put(resultTupleIndexes[i], prevTuple.asDatum(i + 1));
}
for (DistinctFinalAggregator eachAggr: aggregators) {
eachAggr.terminate(outTuple);
}
return outTuple;
}
int distinctSeq = tuple.getInt2(0);
Tuple keyTuple = getGroupingKeyTuple(tuple);
// First tuple
if (prevKeyTuple == null) {
prevKeyTuple = new VTuple(keyTuple.getValues());
prevTuple = new VTuple(tuple.getValues());
aggregators[distinctSeq].merge(tuple);
continue;
}
if (!prevKeyTuple.equals(keyTuple)) {
// new grouping key
for (int i = 0; i < numGroupingColumns; i++) {
outTuple.put(resultTupleIndexes[i], prevTuple.asDatum(i + 1));
}
for (DistinctFinalAggregator eachAggr: aggregators) {
eachAggr.terminate(outTuple);
}
prevKeyTuple.put(keyTuple.getValues());
prevTuple.put(tuple.getValues());
aggregators[distinctSeq].merge(tuple);
return outTuple;
} else {
prevKeyTuple.put(keyTuple.getValues());
prevTuple.put(tuple.getValues());
aggregators[distinctSeq].merge(tuple);
}
}
return null;
}
private Tuple makeEmptyTuple() {
for (DistinctFinalAggregator eachAggr: aggregators) {
eachAggr.terminateEmpty(outTuple);
}
return outTuple;
}
private Tuple getGroupingKeyTuple(Tuple tuple) {
for (int i = 0; i < numGroupingColumns; i++) {
keyTuple.put(i, tuple.asDatum(i + 1));
}
return keyTuple;
}
@Override
public void rescan() throws IOException {
super.rescan();
prevKeyTuple = null;
prevTuple = null;
finished = false;
}
@Override
public void close() throws IOException {
super.close();
}
class DistinctFinalAggregator {
private FunctionContext[] functionContexts;
private List<AggregationFunctionCallEval> aggrFunctions;
private int seq;
private int inTupleIndex;
private int outTupleIndex;
public DistinctFinalAggregator(int seq, int inTupleIndex, int outTupleIndex, GroupbyNode groupbyNode) {
this.seq = seq;
this.inTupleIndex = inTupleIndex;
this.outTupleIndex = outTupleIndex;
aggrFunctions = groupbyNode.getAggFunctions();
if (aggrFunctions != null) {
for (AggregationFunctionCallEval eachFunction: aggrFunctions) {
eachFunction.bind(context.getEvalContext(), inSchema);
eachFunction.setLastPhase();
}
}
newFunctionContext();
}
private void newFunctionContext() {
functionContexts = new FunctionContext[aggrFunctions.size()];
for (int i = 0; i < aggrFunctions.size(); i++) {
functionContexts[i] = aggrFunctions.get(i).newContext();
}
}
public void merge(Tuple tuple) {
for (int i = 0; i < aggrFunctions.size(); i++) {
aggrFunctions.get(i).merge(functionContexts[i], tuple);
}
if (seq == 0 && nonDistinctAggr != null) {
nonDistinctAggr.merge(tuple);
}
}
public void terminate(Tuple resultTuple) {
for (int i = 0; i < aggrFunctions.size(); i++) {
resultTuple.put(resultTupleIndexes[outTupleIndex + i], aggrFunctions.get(i).terminate(functionContexts[i]));
}
newFunctionContext();
if (seq == 0 && nonDistinctAggr != null) {
nonDistinctAggr.terminate(resultTuple);
}
}
public void terminateEmpty(Tuple resultTuple) {
newFunctionContext();
for (int i = 0; i < aggrFunctions.size(); i++) {
resultTuple.put(resultTupleIndexes[outTupleIndex + i], aggrFunctions.get(i).terminate(functionContexts[i]));
}
if (seq == 0 && nonDistinctAggr != null) {
nonDistinctAggr.terminateEmpty(resultTuple);
}
}
}
}