blob: b1c6e3b907388f263428382f326b1ecb8f5ca6c0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include "expressions/window_aggregation/WindowAggregationHandleAvg.hpp"
#include <cstddef>
#include <memory>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
#include "expressions/window_aggregation/WindowAggregationHandle.hpp"
#include "storage/ValueAccessor.hpp"
#include "types/Type.hpp"
#include "types/TypeFactory.hpp"
#include "types/TypeID.hpp"
#include "types/TypedValue.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
#include "types/operations/binary_operations/BinaryOperation.hpp"
#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
#include "types/operations/binary_operations/BinaryOperationID.hpp"
#include "types/operations/comparisons/Comparison.hpp"
#include "glog/logging.h"
namespace quickstep {
WindowAggregationHandleAvg::WindowAggregationHandleAvg(
const std::vector<std::unique_ptr<const Scalar>> &partition_by_attributes,
const std::vector<std::unique_ptr<const Scalar>> &order_by_attributes,
const bool is_row,
const std::int64_t num_preceding,
const std::int64_t num_following,
const Type *argument_type)
: WindowAggregationHandle(partition_by_attributes,
order_by_attributes,
is_row,
num_preceding,
num_following) {
// We sum Int as Long and Float as Double so that we have more headroom when
// adding many values.
TypeID type_id;
switch (argument_type->getTypeID()) {
case kInt:
case kLong:
type_id = kLong;
break;
case kFloat:
case kDouble:
type_id = kDouble;
break;
default:
type_id = argument_type->getTypeID();
break;
}
sum_type_ = &(TypeFactory::GetType(type_id));
// Result is nullable, because AVG() over 0 values (or all NULL values) is
// NULL.
result_type_
= &(BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
.resultTypeForArgumentTypes(*sum_type_, TypeFactory::GetType(kDouble))
->getNullableVersion());
// Make operators to do arithmetic:
// Add operator for summing argument values.
fast_add_operator_.reset(
BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kAdd)
.makeUncheckedBinaryOperatorForTypes(*sum_type_, *argument_type));
// Subtract operator for dropping argument values off the window.
fast_subtract_operator_.reset(
BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kSubtract)
.makeUncheckedBinaryOperatorForTypes(*sum_type_, *argument_type));
// Divide operator for dividing sum by count to get final average.
divide_operator_.reset(
BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide)
.makeUncheckedBinaryOperatorForTypes(*sum_type_, TypeFactory::GetType(kDouble)));
}
ColumnVector* WindowAggregationHandleAvg::calculate(
ColumnVectorsValueAccessor *tuple_accessor,
const std::vector<ColumnVector*> &arguments) const {
DCHECK_EQ(1u, arguments.size());
DCHECK(arguments[0]->isNative());
DCHECK_EQ(static_cast<std::size_t>(tuple_accessor->getNumTuples()),
static_cast<const NativeColumnVector*>(arguments[0])->size());
// Initialize the output column and argument accessor.
NativeColumnVector *window_aggregates =
new NativeColumnVector(*result_type_, tuple_accessor->getNumTuples());
ColumnVectorsValueAccessor *argument_accessor = new ColumnVectorsValueAccessor();
argument_accessor->addColumn(arguments[0]);
// Initialize the information about the window.
TypedValue sum = sum_type_->makeZeroValue();
std::uint64_t count = 0;
tuple_id start_tuple_id = 0; // The id of the first tuple in the window.
tuple_id end_tuple_id = 0; // The id of the tuple that just passed the last
// tuple in the window.
// Create a window for each tuple and calculate the window aggregate.
tuple_accessor->beginIteration();
argument_accessor->beginIteration();
while (tuple_accessor->next() && argument_accessor->next()) {
tuple_id current_tuple_id = tuple_accessor->getCurrentPosition();
// If current tuple is not in the same partition as the previous tuple,
// reset the window.
if (!samePartition(tuple_accessor, current_tuple_id - 1)) {
start_tuple_id = current_tuple_id;
end_tuple_id = current_tuple_id;
count = 0;
sum = sum_type_->makeZeroValue();
}
// Drop tuples that will be out of the window from the beginning.
while (!inWindow(tuple_accessor, start_tuple_id)) {
TypedValue start_value =
argument_accessor->getTypedValueAtAbsolutePosition(0, start_tuple_id);
// Ignore the value if NULL.
if (!start_value.isNull()) {
sum = fast_subtract_operator_->applyToTypedValues(sum, start_value);
count--;
}
start_tuple_id++;
}
// Add tuples that will be included by the window at the end.
while (inWindow(tuple_accessor, end_tuple_id)) {
TypedValue end_value =
argument_accessor->getTypedValueAtAbsolutePosition(0, end_tuple_id);
// Ignore the value if NULL.
if (!end_value.isNull()) {
sum = fast_add_operator_->applyToTypedValues(sum, end_value);
count++;
}
end_tuple_id++;
}
// If all values are NULLs, return NULL; Otherwise, return the quotient.
if (count == 0) {
window_aggregates->appendTypedValue(result_type_->makeNullValue());
} else {
window_aggregates->appendTypedValue(
divide_operator_->applyToTypedValues(sum, TypedValue(static_cast<double>(count))));
}
}
return window_aggregates;
}
} // namespace quickstep