blob: c09e85ea5dfb390ea49496d4e15d4015b2c612b3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.engine.planner.physical;
import com.google.common.collect.Lists;
import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.SortSpec;
import org.apache.tajo.datum.Datum;
import org.apache.tajo.plan.expr.EvalNode;
import org.apache.tajo.plan.expr.WindowFunctionEval;
import org.apache.tajo.plan.function.FunctionContext;
import org.apache.tajo.plan.logical.WindowAggNode;
import org.apache.tajo.plan.logical.WindowSpec;
import org.apache.tajo.storage.BaseTupleComparator;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.TupleComparator;
import org.apache.tajo.storage.VTuple;
import org.apache.tajo.worker.TaskAttemptContext;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
/**
* The sort-based window aggregation operator
*/
public class WindowAggExec extends UnaryPhysicalExec {
// plan information
protected final int outputColumnNum;
protected final int nonFunctionColumnNum;
protected final int nonFunctionColumns[];
protected final int functionNum;
protected final WindowFunctionEval functions[];
protected Schema schemaForOrderBy;
protected int sortKeyColumns[];
protected final boolean hasPartitionKeys;
protected final int partitionKeyNum;
protected final int partitionKeyIds[];
// for evaluation
protected FunctionContext contexts [];
protected Tuple lastKey = null;
protected boolean noMoreTuples = false;
private boolean [] orderedFuncFlags;
private boolean [] aggFuncFlags;
private boolean [] windowFuncFlags;
private boolean [] endUnboundedFollowingFlags;
private boolean [] endCurrentRowFlags;
private Tuple currentKey;
private Tuple evaluatedTuple;
// operator state
enum WindowState {
NEW_WINDOW,
ACCUMULATING_WINDOW,
EVALUATION,
RETRIEVING_FROM_WINDOW,
END_OF_TUPLE
}
// Transient state
boolean firstTime = true;
TupleList evaluatedTuples = null;
TupleList accumulatedInTuples = null;
TupleList nextAccumulatedInTuples = null;
WindowState state = WindowState.NEW_WINDOW;
Iterator<Tuple> tupleInFrameIterator = null;
public WindowAggExec(TaskAttemptContext context, WindowAggNode plan, PhysicalExec child) throws IOException {
super(context, plan.getInSchema(), plan.getOutSchema(), child);
if (plan.hasPartitionKeys()) {
final Column[] keyColumns = plan.getPartitionKeys();
partitionKeyNum = keyColumns.length;
partitionKeyIds = new int[partitionKeyNum];
Column col;
for (int idx = 0; idx < plan.getPartitionKeys().length; idx++) {
col = keyColumns[idx];
partitionKeyIds[idx] = inSchema.getColumnId(col.getQualifiedName());
}
hasPartitionKeys = true;
} else {
partitionKeyNum = 0;
partitionKeyIds = null;
hasPartitionKeys = false;
}
currentKey = new VTuple(partitionKeyNum);
if (plan.hasAggFunctions()) {
functions = plan.getWindowFunctions();
functionNum = functions.length;
orderedFuncFlags = new boolean[functions.length];
windowFuncFlags = new boolean[functions.length];
aggFuncFlags = new boolean[functions.length];
endUnboundedFollowingFlags = new boolean[functions.length];
endCurrentRowFlags = new boolean[functions.length];
List<Column> additionalSortKeyColumns = Lists.newArrayList();
Schema rewrittenSchema = new Schema(outSchema);
for (int i = 0; i < functions.length; i++) {
WindowSpec.WindowEndBound endBound = functions[i].getWindowFrame().getEndBound();
switch (endBound.getBoundType()) {
case CURRENT_ROW:
endCurrentRowFlags[i] = true; break;
case UNBOUNDED_FOLLOWING:
endUnboundedFollowingFlags[i] = true; break;
default:
}
switch (functions[i].getFuncDesc().getFuncType()) {
case AGGREGATION:
case DISTINCT_AGGREGATION:
aggFuncFlags[i] = true; break;
case WINDOW:
windowFuncFlags[i] = true; break;
default:
}
if (functions[i].hasSortSpecs()) {
orderedFuncFlags[i] = true;
for (SortSpec sortSpec : functions[i].getSortSpecs()) {
if (!rewrittenSchema.contains(sortSpec.getSortKey())) {
// check if additionalSortKeyColumns already has that sort key
boolean newKey = true;
for (Column c : additionalSortKeyColumns) {
if (c.equals(sortSpec.getSortKey())) {
newKey = false;
}
}
if (newKey) {
additionalSortKeyColumns.add(sortSpec.getSortKey());
}
}
}
}
}
sortKeyColumns = new int[additionalSortKeyColumns.size()];
schemaForOrderBy = new Schema(outSchema);
for (int i = 0; i < additionalSortKeyColumns.size(); i++) {
sortKeyColumns[i] = i;
schemaForOrderBy.addColumn(additionalSortKeyColumns.get(i));
}
} else {
functions = new WindowFunctionEval[0];
functionNum = 0;
schemaForOrderBy = outSchema;
}
evaluatedTuple = new VTuple(schemaForOrderBy.size());
nonFunctionColumnNum = plan.getTargets().length - functionNum;
nonFunctionColumns = new int[nonFunctionColumnNum];
for (int idx = 0; idx < plan.getTargets().length - functionNum; idx++) {
nonFunctionColumns[idx] = inSchema.getColumnId(plan.getTargets()[idx].getCanonicalName());
}
outputColumnNum = nonFunctionColumnNum + functionNum;
}
@Override
public void init() throws IOException {
super.init();
for (EvalNode functionEval : functions) {
functionEval.bind(context.getEvalContext(), inSchema);
}
}
private void transition(WindowState state) {
this.state = state;
}
@Override
public Tuple next() throws IOException {
Tuple readTuple = null;
while(!context.isStopped() && state != WindowState.END_OF_TUPLE) {
if (state == WindowState.NEW_WINDOW) {
initWindow();
transition(WindowState.ACCUMULATING_WINDOW);
}
if (state != WindowState.RETRIEVING_FROM_WINDOW) { // read an input tuple and build a partition key
readTuple = child.next();
if (readTuple == null) { // the end of tuple
noMoreTuples = true;
transition(WindowState.EVALUATION);
}
if (readTuple != null && hasPartitionKeys) { // get a key tuple
for (int i = 0; i < partitionKeyIds.length; i++) {
currentKey.put(i, readTuple.asDatum(partitionKeyIds[i]));
}
}
}
if (state == WindowState.ACCUMULATING_WINDOW) {
accumulatingWindow(currentKey, readTuple);
}
if (state == WindowState.EVALUATION) {
evaluationWindowFrame();
tupleInFrameIterator = evaluatedTuples.iterator();
transition(WindowState.RETRIEVING_FROM_WINDOW);
}
if (state == WindowState.RETRIEVING_FROM_WINDOW) {
if (tupleInFrameIterator.hasNext()) {
return tupleInFrameIterator.next();
} else {
finalizeWindow();
}
}
}
return null;
}
private void initWindow() {
if (firstTime) {
accumulatedInTuples = new TupleList();
contexts = new FunctionContext[functionNum];
for(int evalIdx = 0; evalIdx < functionNum; evalIdx++) {
contexts[evalIdx] = functions[evalIdx].newContext();
}
firstTime = false;
}
}
private void accumulatingWindow(Tuple currentKey, Tuple inTuple) {
if (lastKey == null || lastKey.equals(currentKey)) { // if the current key is same to the previous key
accumulatedInTuples.add(inTuple);
if (lastKey == null) {
lastKey = new VTuple(currentKey.size());
}
} else {
// if the current key is different from the previous key,
// the current key belongs to the next window frame. preaccumulatingNextWindow() will
// aggregate the current key for next window frame.
preAccumulatingNextWindow(inTuple);
transition(WindowState.EVALUATION);
}
lastKey.put(currentKey.getValues());
}
private void preAccumulatingNextWindow(Tuple inTuple) {
nextAccumulatedInTuples = new TupleList();
nextAccumulatedInTuples.add(inTuple);
}
private void evaluationWindowFrame() {
TupleComparator comp;
evaluatedTuples = new TupleList();
for (int i = 0; i <accumulatedInTuples.size(); i++) {
Tuple inTuple = accumulatedInTuples.get(i);
for (int c = 0; c < nonFunctionColumnNum; c++) {
evaluatedTuple.put(c, inTuple.asDatum(nonFunctionColumns[c]));
}
for (int c = 0; c < sortKeyColumns.length; c++) {
evaluatedTuple.put(outputColumnNum + c, inTuple.asDatum(sortKeyColumns[c]));
}
evaluatedTuples.add(evaluatedTuple);
}
for (int idx = 0; idx < functions.length; idx++) {
if (orderedFuncFlags[idx]) {
comp = new BaseTupleComparator(inSchema, functions[idx].getSortSpecs());
Collections.sort(accumulatedInTuples, comp);
comp = new BaseTupleComparator(schemaForOrderBy, functions[idx].getSortSpecs());
Collections.sort(evaluatedTuples, comp);
}
for (int i = 0; i < accumulatedInTuples.size(); i++) {
Tuple inTuple = accumulatedInTuples.get(i);
Tuple outTuple = evaluatedTuples.get(i);
functions[idx].merge(contexts[idx], inTuple);
if (windowFuncFlags[idx]) {
Datum result = functions[idx].terminate(contexts[idx]);
outTuple.put(nonFunctionColumnNum + idx, result);
}
}
if (aggFuncFlags[idx]) {
for (int i = 0; i < evaluatedTuples.size(); i++) {
Datum result = functions[idx].terminate(contexts[idx]);
Tuple outTuple = evaluatedTuples.get(i);
outTuple.put(nonFunctionColumnNum + idx, result);
}
}
}
}
private void finalizeWindow() {
evaluatedTuples.clear();
accumulatedInTuples.clear();
if (noMoreTuples) {
transition(WindowState.END_OF_TUPLE);
} else {
accumulatedInTuples = nextAccumulatedInTuples;
contexts = new FunctionContext[functionNum];
for(int evalIdx = 0; evalIdx < functionNum; evalIdx++) {
contexts[evalIdx] = functions[evalIdx].newContext();
}
transition(WindowState.NEW_WINDOW);
}
}
@Override
public void rescan() throws IOException {
super.rescan();
lastKey = null;
noMoreTuples = false;
}
@Override
public void close() throws IOException {
if (evaluatedTuples != null) {
evaluatedTuples.clear();
evaluatedTuples = null;
}
if (accumulatedInTuples != null) {
accumulatedInTuples.clear();
accumulatedInTuples = null;
}
if (nextAccumulatedInTuples != null) {
nextAccumulatedInTuples.clear();
nextAccumulatedInTuples = null;
}
super.close();
}
}