blob: 58cfca4a7c5643590da9dfbcbcb58ae2875aa47b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.engine.planner.physical;
import org.apache.tajo.catalog.statistics.TableStats;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.plan.logical.DistinctGroupbyNode;
import org.apache.tajo.plan.logical.GroupbyNode;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
public class DistinctGroupbySortAggregationExec extends PhysicalExec {
private SortAggregateExec[] aggregateExecs;
private boolean finished = false;
private Tuple[] currentTuples;
private int outColumnNum;
private int groupbyNodeNum;
private int[] resultColumnIdIndexes;
private final Tuple outTuple;
public DistinctGroupbySortAggregationExec(final TaskAttemptContext context, DistinctGroupbyNode plan,
SortAggregateExec[] aggregateExecs) throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema());
this.aggregateExecs = aggregateExecs;
this.groupbyNodeNum = plan.getSubPlans().size();
currentTuples = new Tuple[groupbyNodeNum];
outColumnNum = outSchema.size();
outTuple = new VTuple(outColumnNum);
int allGroupbyOutColNum = 0;
for (GroupbyNode eachGroupby: plan.getSubPlans()) {
allGroupbyOutColNum += eachGroupby.getOutSchema().size();
}
resultColumnIdIndexes = new int[allGroupbyOutColNum];
for (int i = 0; i < allGroupbyOutColNum; i++) {
resultColumnIdIndexes[i] = -1;
}
int[] resultColumnIds = plan.getResultColumnIds();
for(int i = 0; i < resultColumnIds.length; i++) {
resultColumnIdIndexes[resultColumnIds[i]] = i;
}
for (SortAggregateExec eachExec: aggregateExecs) {
eachExec.init();
}
}
boolean first = true;
@Override
public Tuple next() throws IOException {
if (finished) {
return null;
}
boolean allNull = true;
for (int i = 0; i < groupbyNodeNum; i++) {
if (first && i > 0) {
// All SortAggregateExec uses same SeqScanExec object.
// After running sort, rescan() should be called.
if (currentTuples[i-1] != null) {
aggregateExecs[i].rescan();
}
}
currentTuples[i] = aggregateExecs[i].next();
if (currentTuples[i] != null) {
allNull = false;
}
}
// If DistinctGroupbySortAggregationExec received NullDatum and didn't has any grouping keys,
// it should return primitive values for NullDatum.
if (allNull && aggregateExecs[0].groupingKeyNum == 0 && first) {
return getEmptyTuple();
}
first = false;
if (allNull) {
finished = true;
return null;
}
int mergeTupleIndex = 0;
for (int i = 0; i < currentTuples.length; i++) {
int tupleSize = currentTuples[i].size();
for (int j = 0; j < tupleSize; j++) {
if (resultColumnIdIndexes[mergeTupleIndex] >= 0) {
outTuple.put(resultColumnIdIndexes[mergeTupleIndex], currentTuples[i].asDatum(j));
}
mergeTupleIndex++;
}
}
return outTuple;
}
private Tuple getEmptyTuple() {
NullDatum nullDatum = DatumFactory.createNullDatum();
int tupleIndex = 0;
for (SortAggregateExec aggExec: aggregateExecs) {
for (int i = 0; i < aggExec.aggFunctionsNum; i++, tupleIndex++) {
String funcName = aggExec.aggFunctions[i].getName();
if ("min".equals(funcName) || "max".equals(funcName) || "avg".equals(funcName) || "sum".equals(funcName)) {
outTuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createNullDatum());
}
else
{
TajoDataTypes.Type type = outSchema.getColumn(resultColumnIdIndexes[tupleIndex]).getDataType().getType();
if (type == TajoDataTypes.Type.INT8) {
outTuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createInt8(nullDatum.asInt8()));
} else if (type == TajoDataTypes.Type.INT4) {
outTuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createInt4(nullDatum.asInt4()));
} else if (type == TajoDataTypes.Type.INT2) {
outTuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createInt2(nullDatum.asInt2()));
} else if (type == TajoDataTypes.Type.FLOAT4) {
outTuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createFloat4(nullDatum.asFloat4()));
} else if (type == TajoDataTypes.Type.FLOAT8) {
outTuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createFloat8(nullDatum.asFloat8()));
} else {
outTuple.put(resultColumnIdIndexes[tupleIndex], DatumFactory.createNullDatum());
}
}
}
}
finished = true;
first = false;
return outTuple;
}
@Override
public void close() throws IOException {
if (aggregateExecs != null) {
for (SortAggregateExec eachExec: aggregateExecs) {
eachExec.close();
}
}
}
@Override
public void init() throws IOException {
}
@Override
public void rescan() throws IOException {
finished = false;
for (int i = 0; i < groupbyNodeNum; i++) {
aggregateExecs[i].rescan();
}
}
@Override
public float getProgress() {
if (finished) {
return 1.0f;
} else {
return aggregateExecs[aggregateExecs.length - 1].getProgress();
}
}
@Override
public TableStats getInputStats() {
return aggregateExecs[aggregateExecs.length - 1].getInputStats();
}
}