blob: f8db180b1e3003b9c13e568bd3bef189efd19569 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cmath>
#include "arrow/compute/api_aggregate.h"
#include "arrow/compute/kernels/aggregate_internal.h"
#include "arrow/compute/kernels/common.h"
#include "arrow/util/align_util.h"
#include "arrow/util/bit_block_counter.h"
namespace arrow {
namespace compute {
namespace aggregate {
void AddBasicAggKernels(KernelInit init,
const std::vector<std::shared_ptr<DataType>>& types,
std::shared_ptr<DataType> out_ty, ScalarAggregateFunction* func,
SimdLevel::type simd_level = SimdLevel::NONE);
void AddMinMaxKernels(KernelInit init,
const std::vector<std::shared_ptr<DataType>>& types,
ScalarAggregateFunction* func,
SimdLevel::type simd_level = SimdLevel::NONE);
// SIMD variants for kernels
void AddSumAvx2AggKernels(ScalarAggregateFunction* func);
void AddMeanAvx2AggKernels(ScalarAggregateFunction* func);
void AddMinMaxAvx2AggKernels(ScalarAggregateFunction* func);
void AddSumAvx512AggKernels(ScalarAggregateFunction* func);
void AddMeanAvx512AggKernels(ScalarAggregateFunction* func);
void AddMinMaxAvx512AggKernels(ScalarAggregateFunction* func);
// ----------------------------------------------------------------------
// Sum implementation
template <typename ArrowType, SimdLevel::type SimdLevel>
struct SumImpl : public ScalarAggregator {
using ThisType = SumImpl<ArrowType, SimdLevel>;
using CType = typename ArrowType::c_type;
using SumType = typename FindAccumulatorType<ArrowType>::Type;
using OutputType = typename TypeTraits<SumType>::ScalarType;
Status Consume(KernelContext*, const ExecBatch& batch) override {
const auto& data = batch[0].array();
this->count = data->length - data->GetNullCount();
if (is_boolean_type<ArrowType>::value) {
this->sum = static_cast<typename SumType::c_type>(BooleanArray(data).true_count());
} else {
this->sum =
arrow::compute::detail::SumArray<CType, typename SumType::c_type>(*data);
}
return Status::OK();
}
Status MergeFrom(KernelContext*, KernelState&& src) override {
const auto& other = checked_cast<const ThisType&>(src);
this->count += other.count;
this->sum += other.sum;
return Status::OK();
}
Status Finalize(KernelContext*, Datum* out) override {
if (this->count == 0) {
out->value = std::make_shared<OutputType>();
} else {
out->value = MakeScalar(this->sum);
}
return Status::OK();
}
size_t count = 0;
typename SumType::c_type sum = 0;
};
template <typename ArrowType, SimdLevel::type SimdLevel>
struct MeanImpl : public SumImpl<ArrowType, SimdLevel> {
Status Finalize(KernelContext*, Datum* out) override {
if (this->count == 0) {
out->value = std::make_shared<DoubleScalar>();
} else {
const double mean = static_cast<double>(this->sum) / this->count;
out->value = std::make_shared<DoubleScalar>(mean);
}
return Status::OK();
}
};
template <template <typename> class KernelClass>
struct SumLikeInit {
std::unique_ptr<KernelState> state;
KernelContext* ctx;
const DataType& type;
SumLikeInit(KernelContext* ctx, const DataType& type) : ctx(ctx), type(type) {}
Status Visit(const DataType&) { return Status::NotImplemented("No sum implemented"); }
Status Visit(const HalfFloatType&) {
return Status::NotImplemented("No sum implemented");
}
Status Visit(const BooleanType&) {
state.reset(new KernelClass<BooleanType>());
return Status::OK();
}
template <typename Type>
enable_if_number<Type, Status> Visit(const Type&) {
state.reset(new KernelClass<Type>());
return Status::OK();
}
Result<std::unique_ptr<KernelState>> Create() {
RETURN_NOT_OK(VisitTypeInline(type, this));
return std::move(state);
}
};
// ----------------------------------------------------------------------
// MinMax implementation
template <typename ArrowType, SimdLevel::type SimdLevel, typename Enable = void>
struct MinMaxState {};
template <typename ArrowType, SimdLevel::type SimdLevel>
struct MinMaxState<ArrowType, SimdLevel, enable_if_boolean<ArrowType>> {
using ThisType = MinMaxState<ArrowType, SimdLevel>;
using T = typename ArrowType::c_type;
ThisType& operator+=(const ThisType& rhs) {
this->has_nulls |= rhs.has_nulls;
this->has_values |= rhs.has_values;
this->min = this->min && rhs.min;
this->max = this->max || rhs.max;
return *this;
}
void MergeOne(T value) {
this->min = this->min && value;
this->max = this->max || value;
}
T min = true;
T max = false;
bool has_nulls = false;
bool has_values = false;
};
template <typename ArrowType, SimdLevel::type SimdLevel>
struct MinMaxState<ArrowType, SimdLevel, enable_if_integer<ArrowType>> {
using ThisType = MinMaxState<ArrowType, SimdLevel>;
using T = typename ArrowType::c_type;
ThisType& operator+=(const ThisType& rhs) {
this->has_nulls |= rhs.has_nulls;
this->has_values |= rhs.has_values;
this->min = std::min(this->min, rhs.min);
this->max = std::max(this->max, rhs.max);
return *this;
}
void MergeOne(T value) {
this->min = std::min(this->min, value);
this->max = std::max(this->max, value);
}
T min = std::numeric_limits<T>::max();
T max = std::numeric_limits<T>::min();
bool has_nulls = false;
bool has_values = false;
};
template <typename ArrowType, SimdLevel::type SimdLevel>
struct MinMaxState<ArrowType, SimdLevel, enable_if_floating_point<ArrowType>> {
using ThisType = MinMaxState<ArrowType, SimdLevel>;
using T = typename ArrowType::c_type;
ThisType& operator+=(const ThisType& rhs) {
this->has_nulls |= rhs.has_nulls;
this->has_values |= rhs.has_values;
this->min = std::fmin(this->min, rhs.min);
this->max = std::fmax(this->max, rhs.max);
return *this;
}
void MergeOne(T value) {
this->min = std::fmin(this->min, value);
this->max = std::fmax(this->max, value);
}
T min = std::numeric_limits<T>::infinity();
T max = -std::numeric_limits<T>::infinity();
bool has_nulls = false;
bool has_values = false;
};
template <typename ArrowType, SimdLevel::type SimdLevel>
struct MinMaxImpl : public ScalarAggregator {
using ArrayType = typename TypeTraits<ArrowType>::ArrayType;
using ThisType = MinMaxImpl<ArrowType, SimdLevel>;
using StateType = MinMaxState<ArrowType, SimdLevel>;
MinMaxImpl(const std::shared_ptr<DataType>& out_type, const MinMaxOptions& options)
: out_type(out_type), options(options) {}
Status Consume(KernelContext*, const ExecBatch& batch) override {
StateType local;
ArrayType arr(batch[0].array());
const auto null_count = arr.null_count();
local.has_nulls = null_count > 0;
local.has_values = (arr.length() - null_count) > 0;
if (local.has_nulls && options.null_handling == MinMaxOptions::EMIT_NULL) {
this->state = local;
return Status::OK();
}
if (local.has_nulls) {
local += ConsumeWithNulls(arr);
} else { // All true values
for (int64_t i = 0; i < arr.length(); i++) {
local.MergeOne(arr.Value(i));
}
}
this->state = local;
return Status::OK();
}
Status MergeFrom(KernelContext*, KernelState&& src) override {
const auto& other = checked_cast<const ThisType&>(src);
this->state += other.state;
return Status::OK();
}
Status Finalize(KernelContext*, Datum* out) override {
using ScalarType = typename TypeTraits<ArrowType>::ScalarType;
std::vector<std::shared_ptr<Scalar>> values;
if (!state.has_values ||
(state.has_nulls && options.null_handling == MinMaxOptions::EMIT_NULL)) {
// (null, null)
values = {std::make_shared<ScalarType>(), std::make_shared<ScalarType>()};
} else {
values = {std::make_shared<ScalarType>(state.min),
std::make_shared<ScalarType>(state.max)};
}
out->value = std::make_shared<StructScalar>(std::move(values), this->out_type);
return Status::OK();
}
std::shared_ptr<DataType> out_type;
MinMaxOptions options;
MinMaxState<ArrowType, SimdLevel> state;
private:
StateType ConsumeWithNulls(const ArrayType& arr) const {
StateType local;
const int64_t length = arr.length();
int64_t offset = arr.offset();
const uint8_t* bitmap = arr.null_bitmap_data();
int64_t idx = 0;
const auto p = arrow::internal::BitmapWordAlign<1>(bitmap, offset, length);
// First handle the leading bits
const int64_t leading_bits = p.leading_bits;
while (idx < leading_bits) {
if (BitUtil::GetBit(bitmap, offset)) {
local.MergeOne(arr.Value(idx));
}
idx++;
offset++;
}
// The aligned parts scanned with BitBlockCounter
arrow::internal::BitBlockCounter data_counter(bitmap, offset, length - leading_bits);
auto current_block = data_counter.NextWord();
while (idx < length) {
if (current_block.AllSet()) { // All true values
int run_length = 0;
// Scan forward until a block that has some false values (or the end)
while (current_block.length > 0 && current_block.AllSet()) {
run_length += current_block.length;
current_block = data_counter.NextWord();
}
for (int64_t i = 0; i < run_length; i++) {
local.MergeOne(arr.Value(idx + i));
}
idx += run_length;
offset += run_length;
// The current_block already computed, advance to next loop
continue;
} else if (!current_block.NoneSet()) { // Some values are null
BitmapReader reader(arr.null_bitmap_data(), offset, current_block.length);
for (int64_t i = 0; i < current_block.length; i++) {
if (reader.IsSet()) {
local.MergeOne(arr.Value(idx + i));
}
reader.Next();
}
idx += current_block.length;
offset += current_block.length;
} else { // All null values
idx += current_block.length;
offset += current_block.length;
}
current_block = data_counter.NextWord();
}
return local;
}
};
template <SimdLevel::type SimdLevel>
struct BooleanMinMaxImpl : public MinMaxImpl<BooleanType, SimdLevel> {
using StateType = MinMaxState<BooleanType, SimdLevel>;
using ArrayType = typename TypeTraits<BooleanType>::ArrayType;
using MinMaxImpl<BooleanType, SimdLevel>::MinMaxImpl;
using MinMaxImpl<BooleanType, SimdLevel>::options;
Status Consume(KernelContext*, const ExecBatch& batch) override {
StateType local;
ArrayType arr(batch[0].array());
const auto arr_length = arr.length();
const auto null_count = arr.null_count();
const auto valid_count = arr_length - null_count;
local.has_nulls = null_count > 0;
local.has_values = valid_count > 0;
if (local.has_nulls && options.null_handling == MinMaxOptions::EMIT_NULL) {
this->state = local;
return Status::OK();
}
const auto true_count = arr.true_count();
const auto false_count = valid_count - true_count;
local.max = true_count > 0;
local.min = false_count == 0;
this->state = local;
return Status::OK();
}
};
template <SimdLevel::type SimdLevel>
struct MinMaxInitState {
std::unique_ptr<KernelState> state;
KernelContext* ctx;
const DataType& in_type;
const std::shared_ptr<DataType>& out_type;
const MinMaxOptions& options;
MinMaxInitState(KernelContext* ctx, const DataType& in_type,
const std::shared_ptr<DataType>& out_type, const MinMaxOptions& options)
: ctx(ctx), in_type(in_type), out_type(out_type), options(options) {}
Status Visit(const DataType&) {
return Status::NotImplemented("No min/max implemented");
}
Status Visit(const HalfFloatType&) {
return Status::NotImplemented("No min/max implemented");
}
Status Visit(const BooleanType&) {
state.reset(new BooleanMinMaxImpl<SimdLevel>(out_type, options));
return Status::OK();
}
template <typename Type>
enable_if_number<Type, Status> Visit(const Type&) {
state.reset(new MinMaxImpl<Type, SimdLevel>(out_type, options));
return Status::OK();
}
Result<std::unique_ptr<KernelState>> Create() {
RETURN_NOT_OK(VisitTypeInline(in_type, this));
return std::move(state);
}
};
} // namespace aggregate
} // namespace compute
} // namespace arrow