blob: 5288776053ea68e5232fd8411ebb3af62b99e4e5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.window;
import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.config.WindowPOP;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.ValueVector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Named;
import java.util.List;
/**
* WindowFramer implementation that supports the FRAME clause.
* <br>According to the SQL specification, FIRST_VALUE, LAST_VALUE and all aggregate functions support the FRAME clause.
* This class will handle such functions even if the FRAME clause is not present.
*/
public abstract class FrameSupportTemplate implements WindowFramer {
private static final Logger logger = LoggerFactory.getLogger(FrameSupportTemplate.class);
private VectorContainer container;
private VectorContainer internal;
private List<WindowDataBatch> batches;
private int outputCount; // number of rows in currently/last processed batch
private WindowDataBatch current;
private int frameLastRow;
// true when at least one window function needs to process all batches of a partition before passing any batch downstream
private boolean requireFullPartition;
private long remainingRows; // num unprocessed rows in current partition
private long remainingPeers; // num unprocessed peer rows in current frame
private boolean partialPartition; // true if we remainingRows only account for the current batch and more batches are expected for the current partition
private WindowPOP popConfig;
@Override
public void setup(final List<WindowDataBatch> batches, final VectorContainer container, final OperatorContext oContext,
final boolean requireFullPartition, final WindowPOP popConfig) throws SchemaChangeException {
this.container = container;
this.batches = batches;
internal = new VectorContainer(oContext);
allocateInternal();
outputCount = 0;
this.requireFullPartition = requireFullPartition;
this.popConfig = popConfig;
}
private void allocateInternal() {
for (VectorWrapper<?> w : container) {
ValueVector vv = internal.addOrGet(w.getField());
vv.allocateNew();
}
}
private boolean isPartitionDone() {
return !partialPartition && remainingRows == 0;
}
/**
* processes all rows of the first batch.
*/
@Override
public void doWork() throws DrillException {
int currentRow = 0;
this.current = batches.get(0);
setupSaveFirstValue(current, internal);
outputCount = current.getRecordCount();
while (currentRow < outputCount) {
if (!isPartitionDone()) {
// we have a pending partition we need to handle from a previous call to doWork()
assert currentRow == 0 : "pending partitions are only expected at the start of the batch";
logger.trace("we have a pending partition {}", remainingRows);
if (!requireFullPartition) {
// we didn't compute the whole partition length in the previous partition, we need to update the length now
updatePartitionSize(currentRow);
}
} else {
newPartition(current, currentRow);
}
currentRow = processPartition(currentRow);
if (isPartitionDone()) {
reset();
}
}
}
private void newPartition(final WindowDataBatch current, final int currentRow) throws SchemaChangeException {
remainingRows = 0;
remainingPeers = 0;
updatePartitionSize(currentRow);
setupPartition(current, container);
saveFirstValue(currentRow);
}
private void reset() {
resetValues();
for (VectorWrapper<?> vw : internal) {
if ((vw.getValueVector() instanceof BaseDataValueVector)) {
((BaseDataValueVector) vw.getValueVector()).reset();
}
}
}
/**
* process all rows (computes and writes aggregation values) of current batch that are part of current partition.
* @param currentRow first unprocessed row
* @return index of next unprocessed row
* @throws DrillException if it can't write into the container
*/
private int processPartition(final int currentRow) throws DrillException {
logger.trace("{} rows remaining to process, currentRow: {}, outputCount: {}", remainingRows, currentRow, outputCount);
setupWriteFirstValue(internal, container);
if (popConfig.isFrameUnitsRows()) {
return processROWS(currentRow);
} else {
return processRANGE(currentRow);
}
}
private int processROWS(int row) throws DrillException {
//TODO (DRILL-4413) we only need to call these once per batch
setupEvaluatePeer(current, container);
setupReadLastValue(current, container);
while (row < outputCount && !isPartitionDone()) {
logger.trace("aggregating row {}", row);
evaluatePeer(row);
outputRow(row);
writeLastValue(row, row);
remainingRows--;
row++;
}
return row;
}
private int processRANGE(int row) throws DrillException {
while (row < outputCount && !isPartitionDone()) {
if (remainingPeers == 0) {
// because all peer rows share the same frame, we only need to compute and aggregate the frame once
if (popConfig.getStart().isCurrent()) {
reset();
saveFirstValue(row);
}
remainingPeers = aggregatePeers(row);
}
outputRow(row);
writeLastValue(frameLastRow, row);
remainingRows--;
remainingPeers--;
row++;
}
return row;
}
/**
* updates partition's length after computing the number of rows for the current the partition starting at the specified
* row of the first batch. If !requiresFullPartition, this method will only count the rows in the current batch
*/
private void updatePartitionSize(final int start) {
logger.trace("compute partition size starting from {} on {} batches", start, batches.size());
long length = 0;
int row = start;
// count all rows that are in the same partition of start
// keep increasing length until we find first row of next partition or we reach the very last batch
outer:
for (WindowDataBatch batch : batches) {
final int recordCount = batch.getRecordCount();
// check first container from start row, and subsequent containers from first row
for (; row < recordCount; row++, length++) {
if (!isSamePartition(start, current, row, batch)) {
break outer;
}
}
if (!requireFullPartition) {
// we are only interested in the first batch's records
break;
}
row = 0;
}
if (!requireFullPartition) {
// this is the last batch of current partition if
boolean lastBatch = row < outputCount // partition ends before the end of the batch
|| batches.size() == 1 // it's the last available batch
|| !isSamePartition(start, current, 0, batches.get(1)); // next batch contains a different partition
partialPartition = !lastBatch;
} else {
partialPartition = false;
}
remainingRows += length;
}
/**
* aggregates all peer rows of current row
* @param start starting row of the current frame
* @return num peer rows for current row
* @throws SchemaChangeException
*/
private long aggregatePeers(final int start) throws SchemaChangeException {
logger.trace("aggregating rows starting from {}", start);
final boolean unboundedFollowing = popConfig.getEnd().isUnbounded();
VectorAccessible last = current;
long length = 0;
// a single frame can include rows from multiple batches
// start processing first batch and, if necessary, move to next batches
for (WindowDataBatch batch : batches) {
setupEvaluatePeer(batch, container);
final int recordCount = batch.getRecordCount();
// for every remaining row in the partition, count it if it's a peer row
for (int row = (batch == current) ? start : 0; row < recordCount; row++, length++) {
if (unboundedFollowing) {
if (length >= remainingRows) {
break;
}
} else {
if (!isPeer(start, current, row, batch)) {
break;
}
}
evaluatePeer(row);
last = batch;
frameLastRow = row;
}
}
setupReadLastValue(last, container);
return length;
}
@Override
public int getOutputCount() {
return outputCount;
}
// we need this abstract method for code generation
@Override
public void cleanup() {
logger.trace("clearing internal");
internal.clear();
}
@Override
public String toString() {
return "FrameSupportTemplate[internal=" + internal
+ ", outputCount=" + outputCount
+ ", current=" + current
+ ", frameLastRow=" + frameLastRow
+ ", remainingRows=" + remainingRows
+ ", partialPartition=" + partialPartition
+ "]";
}
/**
* called once for each peer row of the current frame.
* @param index of row to aggregate
*/
public abstract void evaluatePeer(@Named("index") int index);
public abstract void setupEvaluatePeer(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
public abstract void setupReadLastValue(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
public abstract void writeLastValue(@Named("index") int index, @Named("outIndex") int outIndex);
public abstract void setupSaveFirstValue(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
public abstract void saveFirstValue(@Named("index") int index);
public abstract void setupWriteFirstValue(@Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
/**
* called once for each row after we evaluate all peer rows. Used to write a value in the row
*
* @param outIndex index of row
*/
public abstract void outputRow(@Named("outIndex") int outIndex);
/**
* Called once per partition, before processing the partition. Used to setup read/write vectors
* @param incoming batch we will read from
* @param outgoing batch we will be writing to
*
* @throws SchemaChangeException
*/
public abstract void setupPartition(@Named("incoming") WindowDataBatch incoming,
@Named("outgoing") VectorAccessible outgoing) throws SchemaChangeException;
/**
* reset all window functions
*/
public abstract boolean resetValues();
/**
* compares two rows from different batches (can be the same), if they have the same value for the partition by
* expression
* @param b1Index index of first row
* @param b1 batch for first row
* @param b2Index index of second row
* @param b2 batch for second row
* @return true if the rows are in the same partition
*/
public abstract boolean isSamePartition(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
@Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
/**
* compares two rows from different batches (can be the same), if they have the same value for the order by
* expression
* @param b1Index index of first row
* @param b1 batch for first row
* @param b2Index index of second row
* @param b2 batch for second row
* @return true if the rows are in the same partition
*/
public abstract boolean isPeer(@Named("b1Index") int b1Index, @Named("b1") VectorAccessible b1,
@Named("b2Index") int b2Index, @Named("b2") VectorAccessible b2);
}