blob: 0b1f6b5658e75941306970faa3e3204454538c5f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "arrow/compute/exec.h"
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <utility>
#include <vector>
#include "arrow/array/array_base.h"
#include "arrow/array/array_primitive.h"
#include "arrow/array/data.h"
#include "arrow/array/util.h"
#include "arrow/buffer.h"
#include "arrow/chunked_array.h"
#include "arrow/compute/exec_internal.h"
#include "arrow/compute/function.h"
#include "arrow/compute/kernel.h"
#include "arrow/compute/registry.h"
#include "arrow/compute/util_internal.h"
#include "arrow/datum.h"
#include "arrow/record_batch.h"
#include "arrow/scalar.h"
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/cpu_info.h"
#include "arrow/util/logging.h"
#include "arrow/util/make_unique.h"
#include "arrow/util/vector.h"
namespace arrow {
using internal::BitmapAnd;
using internal::checked_cast;
using internal::CopyBitmap;
using internal::CpuInfo;
namespace compute {
ExecContext* default_exec_context() {
static ExecContext default_ctx;
return &default_ctx;
ExecBatch::ExecBatch(const RecordBatch& batch)
: values(batch.num_columns()), length(batch.num_rows()) {
auto columns = batch.column_data();
std::move(columns.begin(), columns.end(), values.begin());
Result<ExecBatch> ExecBatch::Make(std::vector<Datum> values) {
if (values.empty()) {
return Status::Invalid("Cannot infer ExecBatch length without at least one value");
int64_t length = -1;
for (const auto& value : values) {
if (value.is_scalar()) {
if (length == -1) {
length = 1;
if (length == -1) {
length = value.length();
if (length != value.length()) {
return Status::Invalid(
"Arrays used to construct an ExecBatch must have equal length");
return ExecBatch(std::move(values), length);
namespace {
Result<std::shared_ptr<Buffer>> AllocateDataBuffer(KernelContext* ctx, int64_t length,
int bit_width) {
if (bit_width == 1) {
return ctx->AllocateBitmap(length);
} else {
int64_t buffer_size = BitUtil::BytesForBits(length * bit_width);
return ctx->Allocate(buffer_size);
return Status::OK();
struct BufferPreallocation {
explicit BufferPreallocation(int bit_width = -1, int added_length = 0)
: bit_width(bit_width), added_length(added_length) {}
int bit_width;
int added_length;
void ComputeDataPreallocate(const DataType& type,
std::vector<BufferPreallocation>* widths) {
if (is_fixed_width( && != Type::NA) {
widths->emplace_back(checked_cast<const FixedWidthType&>(type).bit_width());
// Preallocate binary and list offsets
switch ( {
case Type::BINARY:
case Type::STRING:
case Type::LIST:
case Type::MAP:
widths->emplace_back(32, /*added_length=*/1);
case Type::LARGE_BINARY:
case Type::LARGE_STRING:
case Type::LARGE_LIST:
widths->emplace_back(64, /*added_length=*/1);
} // namespace
namespace detail {
Status CheckAllValues(const std::vector<Datum>& values) {
for (const auto& value : values) {
if (!value.is_value()) {
return Status::Invalid("Tried executing function with non-value type: ",
return Status::OK();
ExecBatchIterator::ExecBatchIterator(std::vector<Datum> args, int64_t length,
int64_t max_chunksize)
: args_(std::move(args)),
max_chunksize_(max_chunksize) {
chunk_indexes_.resize(args_.size(), 0);
chunk_positions_.resize(args_.size(), 0);
Result<std::unique_ptr<ExecBatchIterator>> ExecBatchIterator::Make(
std::vector<Datum> args, int64_t max_chunksize) {
for (const auto& arg : args) {
if (!(arg.is_arraylike() || arg.is_scalar())) {
return Status::Invalid(
"ExecBatchIterator only works with Scalar, Array, and "
"ChunkedArray arguments");
// If the arguments are all scalars, then the length is 1
int64_t length = 1;
bool length_set = false;
for (auto& arg : args) {
if (arg.is_scalar()) {
if (!length_set) {
length = arg.length();
length_set = true;
} else {
if (arg.length() != length) {
return Status::Invalid("Array arguments must all be the same length");
max_chunksize = std::min(length, max_chunksize);
return std::unique_ptr<ExecBatchIterator>(
new ExecBatchIterator(std::move(args), length, max_chunksize));
bool ExecBatchIterator::Next(ExecBatch* batch) {
if (position_ == length_) {
return false;
// Determine how large the common contiguous "slice" of all the arguments is
int64_t iteration_size = std::min(length_ - position_, max_chunksize_);
// If length_ is 0, then this loop will never execute
for (size_t i = 0; i < args_.size() && iteration_size > 0; ++i) {
// If the argument is not a chunked array, it's either a Scalar or Array,
// in which case it doesn't influence the size of this batch. Note that if
// the args are all scalars the batch length is 1
if (args_[i].kind() != Datum::CHUNKED_ARRAY) {
const ChunkedArray& arg = *args_[i].chunked_array();
std::shared_ptr<Array> current_chunk;
while (true) {
current_chunk = arg.chunk(chunk_indexes_[i]);
if (chunk_positions_[i] == current_chunk->length()) {
// Chunk is zero-length, or was exhausted in the previous iteration
chunk_positions_[i] = 0;
iteration_size =
std::min(current_chunk->length() - chunk_positions_[i], iteration_size);
// Now, fill the batch
batch->length = iteration_size;
for (size_t i = 0; i < args_.size(); ++i) {
if (args_[i].is_scalar()) {
batch->values[i] = args_[i].scalar();
} else if (args_[i].is_array()) {
batch->values[i] = args_[i].array()->Slice(position_, iteration_size);
} else {
const ChunkedArray& carr = *args_[i].chunked_array();
const auto& chunk = carr.chunk(chunk_indexes_[i]);
batch->values[i] = chunk->data()->Slice(chunk_positions_[i], iteration_size);
chunk_positions_[i] += iteration_size;
position_ += iteration_size;
DCHECK_LE(position_, length_);
return true;
namespace {
struct NullGeneralization {
static type Get(const Datum& datum) {
if (datum.type()->id() == Type::NA) {
return ALL_NULL;
if (datum.is_scalar()) {
return datum.scalar()->is_valid ? ALL_VALID : ALL_NULL;
const auto& arr = *datum.array();
// Do not count the bits if they haven't been counted already
const int64_t known_null_count = arr.null_count.load();
if ((known_null_count == 0) || (arr.buffers[0] == NULLPTR)) {
return ALL_VALID;
if (known_null_count == arr.length) {
return ALL_NULL;
// Null propagation implementation that deals both with preallocated bitmaps
// and maybe-to-be allocated bitmaps
// If the bitmap is preallocated, it MUST be populated (since it might be a
// view of a much larger bitmap). If it isn't preallocated, then we have
// more flexibility.
// * If the batch has no nulls, then we do nothing
// * If only a single array has nulls, and its offset is a multiple of 8,
// then we can zero-copy the bitmap into the output
// * Otherwise, we allocate the bitmap and populate it
class NullPropagator {
NullPropagator(KernelContext* ctx, const ExecBatch& batch, ArrayData* output)
: ctx_(ctx), batch_(batch), output_(output) {
for (const Datum& datum : batch_.values) {
auto null_generalization = NullGeneralization::Get(datum);
if (null_generalization == NullGeneralization::ALL_NULL) {
is_all_null_ = true;
if (null_generalization != NullGeneralization::ALL_VALID &&
datum.kind() == Datum::ARRAY) {
if (output->buffers[0] != nullptr) {
bitmap_preallocated_ = true;
void SetBitmap(Buffer* bitmap) { bitmap_ = bitmap->mutable_data(); }
Status EnsureAllocated() {
if (bitmap_preallocated_) {
return Status::OK();
ARROW_ASSIGN_OR_RAISE(output_->buffers[0], ctx_->AllocateBitmap(output_->length));
return Status::OK();
Status AllNullShortCircuit() {
// OK, the output should be all null
output_->null_count = output_->length;
if (bitmap_preallocated_) {
BitUtil::SetBitsTo(bitmap_, output_->offset, output_->length, false);
return Status::OK();
// Walk all the values with nulls instead of breaking on the first in case
// we find a bitmap that can be reused in the non-preallocated case
for (const ArrayData* arr : arrays_with_nulls_) {
if (arr->null_count.load() == arr->length && arr->buffers[0] != nullptr) {
// Reuse this all null bitmap
output_->buffers[0] = arr->buffers[0];
return Status::OK();
BitUtil::SetBitsTo(bitmap_, output_->offset, output_->length, false);
return Status::OK();
Status PropagateSingle() {
// One array
const ArrayData& arr = *arrays_with_nulls_[0];
const std::shared_ptr<Buffer>& arr_bitmap = arr.buffers[0];
// Reuse the null count if it's known
output_->null_count = arr.null_count.load();
if (bitmap_preallocated_) {
CopyBitmap(arr_bitmap->data(), arr.offset, arr.length, bitmap_, output_->offset);
return Status::OK();
// Two cases when memory was not pre-allocated:
// * Offset is zero: we reuse the bitmap as is
// * Offset is nonzero but a multiple of 8: we can slice the bitmap
// * Offset is not a multiple of 8: we must allocate and use CopyBitmap
// Keep in mind that output_->offset is not permitted to be nonzero when
// the bitmap is not preallocated, and that precondition is asserted
// higher in the call stack.
if (arr.offset == 0) {
output_->buffers[0] = arr_bitmap;
} else if (arr.offset % 8 == 0) {
output_->buffers[0] =
SliceBuffer(arr_bitmap, arr.offset / 8, BitUtil::BytesForBits(arr.length));
} else {
CopyBitmap(arr_bitmap->data(), arr.offset, arr.length, bitmap_,
return Status::OK();
Status PropagateMultiple() {
// More than one array. We use BitmapAnd to intersect their bitmaps
// Do not compute the intersection null count until it's needed
auto Accumulate = [&](const ArrayData& left, const ArrayData& right) {
BitmapAnd(left.buffers[0]->data(), left.offset, right.buffers[0]->data(),
right.offset, output_->length, output_->offset,
DCHECK_GT(arrays_with_nulls_.size(), 1);
// Seed the output bitmap with the & of the first two bitmaps
Accumulate(*arrays_with_nulls_[0], *arrays_with_nulls_[1]);
// Accumulate the rest
for (size_t i = 2; i < arrays_with_nulls_.size(); ++i) {
Accumulate(*output_, *arrays_with_nulls_[i]);
return Status::OK();
Status Execute() {
if (is_all_null_) {
// An all-null value (scalar null or all-null array) gives us a short
// circuit opportunity
return AllNullShortCircuit();
// At this point, by construction we know that all of the values in
// arrays_with_nulls_ are arrays that are not all null. So there are a
// few cases:
// * No arrays. This is a no-op w/o preallocation but when the bitmap is
// pre-allocated we have to fill it with 1's
// * One array, whose bitmap can be zero-copied (w/o preallocation, and
// when no byte is split) or copied (split byte or w/ preallocation)
// * More than one array, we must compute the intersection of all the
// bitmaps
// BUT, if the output offset is nonzero for some reason, we copy into the
// output unconditionally
output_->null_count = kUnknownNullCount;
if (arrays_with_nulls_.empty()) {
// No arrays with nulls case
output_->null_count = 0;
if (bitmap_preallocated_) {
BitUtil::SetBitsTo(bitmap_, output_->offset, output_->length, true);
return Status::OK();
if (arrays_with_nulls_.size() == 1) {
return PropagateSingle();
return PropagateMultiple();
KernelContext* ctx_;
const ExecBatch& batch_;
std::vector<const ArrayData*> arrays_with_nulls_;
bool is_all_null_ = false;
ArrayData* output_;
uint8_t* bitmap_;
bool bitmap_preallocated_ = false;
std::shared_ptr<ChunkedArray> ToChunkedArray(const std::vector<Datum>& values,
const std::shared_ptr<DataType>& type) {
std::vector<std::shared_ptr<Array>> arrays;
for (const Datum& val : values) {
if (val.length() == 0) {
// Skip empty chunks
return std::make_shared<ChunkedArray>(std::move(arrays), type);
bool HaveChunkedArray(const std::vector<Datum>& values) {
for (const auto& value : values) {
if (value.kind() == Datum::CHUNKED_ARRAY) {
return true;
return false;
template <typename KernelType>
class KernelExecutorImpl : public KernelExecutor {
Status Init(KernelContext* kernel_ctx, KernelInitArgs args) override {
kernel_ctx_ = kernel_ctx;
kernel_ = static_cast<const KernelType*>(args.kernel);
// Resolve the output descriptor for this kernel
output_descr_, kernel_->signature->out_type().Resolve(kernel_ctx_, args.inputs));
return Status::OK();
// This is overridden by the VectorExecutor
virtual Status SetupArgIteration(const std::vector<Datum>& args) {
batch_iterator_, ExecBatchIterator::Make(args, exec_context()->exec_chunksize()));
return Status::OK();
Result<std::shared_ptr<ArrayData>> PrepareOutput(int64_t length) {
auto out = std::make_shared<ArrayData>(output_descr_.type, length);
if (validity_preallocated_) {
ARROW_ASSIGN_OR_RAISE(out->buffers[0], kernel_ctx_->AllocateBitmap(length));
if (kernel_->null_handling == NullHandling::OUTPUT_NOT_NULL) {
out->null_count = 0;
for (size_t i = 0; i < data_preallocated_.size(); ++i) {
const auto& prealloc = data_preallocated_[i];
if (prealloc.bit_width >= 0) {
out->buffers[i + 1],
AllocateDataBuffer(kernel_ctx_, length + prealloc.added_length,
return out;
ExecContext* exec_context() { return kernel_ctx_->exec_context(); }
KernelState* state() { return kernel_ctx_->state(); }
// Not all of these members are used for every executor type
KernelContext* kernel_ctx_;
const KernelType* kernel_;
std::unique_ptr<ExecBatchIterator> batch_iterator_;
ValueDescr output_descr_;
int output_num_buffers_;
// If true, then memory is preallocated for the validity bitmap with the same
// strategy as the data buffer(s).
bool validity_preallocated_ = false;
// The kernel writes into data buffers preallocated for these bit widths
// (0 indicates no preallocation);
std::vector<BufferPreallocation> data_preallocated_;
class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
Status Execute(const std::vector<Datum>& args, ExecListener* listener) override {
ExecBatch batch;
while (batch_iterator_->Next(&batch)) {
RETURN_NOT_OK(ExecuteBatch(batch, listener));
if (preallocate_contiguous_) {
// If we preallocated one big chunk, since the kernel execution is
// completed, we can now emit it
return Status::OK();
Datum WrapResults(const std::vector<Datum>& inputs,
const std::vector<Datum>& outputs) override {
if (output_descr_.shape == ValueDescr::SCALAR) {
DCHECK_GT(outputs.size(), 0);
if (outputs.size() == 1) {
// Return as SCALAR
return outputs[0];
} else {
// Return as COLLECTION
return outputs;
} else {
// If execution yielded multiple chunks (because large arrays were split
// based on the ExecContext parameters, then the result is a ChunkedArray
if (HaveChunkedArray(inputs) || outputs.size() > 1) {
return ToChunkedArray(outputs, output_descr_.type);
} else if (outputs.size() == 1) {
// Outputs have just one element
return outputs[0];
} else {
// XXX: In the case where no outputs are omitted, is returning a 0-length
// array always the correct move?
return MakeArrayOfNull(output_descr_.type, /*length=*/0,
Status ExecuteBatch(const ExecBatch& batch, ExecListener* listener) {
Datum out;
RETURN_NOT_OK(PrepareNextOutput(batch, &out));
if (output_descr_.shape == ValueDescr::ARRAY) {
ArrayData* out_arr = out.mutable_array();
if (kernel_->null_handling == NullHandling::INTERSECTION) {
RETURN_NOT_OK(PropagateNulls(kernel_ctx_, batch, out_arr));
} else if (kernel_->null_handling == NullHandling::OUTPUT_NOT_NULL) {
out_arr->null_count = 0;
} else {
if (kernel_->null_handling == NullHandling::INTERSECTION) {
// set scalar validity
out.scalar()->is_valid =
std::all_of(batch.values.begin(), batch.values.end(),
[](const Datum& input) { return input.scalar()->is_valid; });
} else if (kernel_->null_handling == NullHandling::OUTPUT_NOT_NULL) {
out.scalar()->is_valid = true;
RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out));
if (!preallocate_contiguous_) {
// If we are producing chunked output rather than one big array, then
// emit each chunk as soon as it's available
return Status::OK();
Status PrepareExecute(const std::vector<Datum>& args) {
if (output_descr_.shape == ValueDescr::ARRAY) {
// If the executor is configured to produce a single large Array output for
// kernels supporting preallocation, then we do so up front and then
// iterate over slices of that large array. Otherwise, we preallocate prior
// to processing each batch emitted from the ExecBatchIterator
return Status::OK();
// We must accommodate two different modes of execution for preallocated
// execution
// * A single large ("contiguous") allocation that we populate with results
// on a chunkwise basis according to the ExecBatchIterator. This permits
// parallelization even if the objective is to obtain a single Array or
// ChunkedArray at the end
// * A standalone buffer preallocation for each chunk emitted from the
// ExecBatchIterator
// When data buffer preallocation is not possible (e.g. with BINARY / STRING
// outputs), then contiguous results are only possible if the input is
// contiguous.
Status PrepareNextOutput(const ExecBatch& batch, Datum* out) {
if (output_descr_.shape == ValueDescr::ARRAY) {
if (preallocate_contiguous_) {
// The output is already fully preallocated
const int64_t batch_start_position = batch_iterator_->position() - batch.length;
if (batch.length < batch_iterator_->length()) {
// If this is a partial execution, then we write into a slice of
// preallocated_
out->value = preallocated_->Slice(batch_start_position, batch.length);
} else {
// Otherwise write directly into preallocated_. The main difference
// computationally (versus the Slice approach) is that the null_count
// may not need to be recomputed in the result
out->value = preallocated_;
} else {
// We preallocate (maybe) only for the output of processing the current
// batch
ARROW_ASSIGN_OR_RAISE(out->value, PrepareOutput(batch.length));
} else {
// For scalar outputs, we set a null scalar of the correct type to
// communicate the output type to the kernel if needed
// XXX: Is there some way to avoid this step?
out->value = MakeNullScalar(output_descr_.type);
return Status::OK();
Status SetupPreallocation(int64_t total_length) {
output_num_buffers_ = static_cast<int>(output_descr_.type->layout().buffers.size());
// Decide if we need to preallocate memory for this kernel
validity_preallocated_ =
(kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE &&
kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL &&
output_descr_.type->id() != Type::NA);
if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) {
ComputeDataPreallocate(*output_descr_.type, &data_preallocated_);
// Contiguous preallocation only possible on non-nested types if all
// buffers are preallocated. Otherwise, we must go chunk-by-chunk.
// Some kernels are also unable to write into sliced outputs, so we respect the
// kernel's attributes.
preallocate_contiguous_ =
(exec_context()->preallocate_contiguous() && kernel_->can_write_into_slices &&
validity_preallocated_ && !is_nested(output_descr_.type->id()) &&
!is_dictionary(output_descr_.type->id()) &&
data_preallocated_.size() == static_cast<size_t>(output_num_buffers_ - 1) &&
std::all_of(data_preallocated_.begin(), data_preallocated_.end(),
[](const BufferPreallocation& prealloc) {
return prealloc.bit_width >= 0;
if (preallocate_contiguous_) {
ARROW_ASSIGN_OR_RAISE(preallocated_, PrepareOutput(total_length));
return Status::OK();
// If true, and the kernel and output type supports preallocation (for both
// the validity and data buffers), then we allocate one big array and then
// iterate through it while executing the kernel in chunks
bool preallocate_contiguous_ = false;
// For storing a contiguous preallocation per above. Unused otherwise
std::shared_ptr<ArrayData> preallocated_;
Status PackBatchNoChunks(const std::vector<Datum>& args, ExecBatch* out) {
int64_t length = 0;
for (const auto& arg : args) {
switch (arg.kind()) {
case Datum::SCALAR:
case Datum::ARRAY:
case Datum::CHUNKED_ARRAY:
length = std::max(arg.length(), length);
out->length = length;
out->values = args;
return Status::OK();
class VectorExecutor : public KernelExecutorImpl<VectorKernel> {
Status Execute(const std::vector<Datum>& args, ExecListener* listener) override {
ExecBatch batch;
if (kernel_->can_execute_chunkwise) {
while (batch_iterator_->Next(&batch)) {
RETURN_NOT_OK(ExecuteBatch(batch, listener));
} else {
RETURN_NOT_OK(PackBatchNoChunks(args, &batch));
RETURN_NOT_OK(ExecuteBatch(batch, listener));
return Finalize(listener);
Datum WrapResults(const std::vector<Datum>& inputs,
const std::vector<Datum>& outputs) override {
// If execution yielded multiple chunks (because large arrays were split
// based on the ExecContext parameters, then the result is a ChunkedArray
if (kernel_->output_chunked && (HaveChunkedArray(inputs) || outputs.size() > 1)) {
return ToChunkedArray(outputs, output_descr_.type);
} else if (outputs.size() == 1) {
// Outputs have just one element
return outputs[0];
} else {
// XXX: In the case where no outputs are omitted, is returning a 0-length
// array always the correct move?
return MakeArrayOfNull(output_descr_.type, /*length=*/0).ValueOrDie();
Status ExecuteBatch(const ExecBatch& batch, ExecListener* listener) {
if (batch.length == 0) {
// Skip empty batches. This may only happen when not using
// ExecBatchIterator
return Status::OK();
Datum out;
if (output_descr_.shape == ValueDescr::ARRAY) {
// We preallocate (maybe) only for the output of processing the current
// batch
ARROW_ASSIGN_OR_RAISE(out.value, PrepareOutput(batch.length));
if (kernel_->null_handling == NullHandling::INTERSECTION &&
output_descr_.shape == ValueDescr::ARRAY) {
RETURN_NOT_OK(PropagateNulls(kernel_ctx_, batch, out.mutable_array()));
RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out));
if (!kernel_->finalize) {
// If there is no result finalizer (e.g. for hash-based functions, we can
// emit the processed batch right away rather than waiting
} else {
return Status::OK();
Status Finalize(ExecListener* listener) {
if (kernel_->finalize) {
// Intermediate results require post-processing after the execution is
// completed (possibly involving some accumulated state)
RETURN_NOT_OK(kernel_->finalize(kernel_ctx_, &results_));
for (const auto& result : results_) {
return Status::OK();
Status SetupArgIteration(const std::vector<Datum>& args) override {
if (kernel_->can_execute_chunkwise) {
ARROW_ASSIGN_OR_RAISE(batch_iterator_, ExecBatchIterator::Make(
args, exec_context()->exec_chunksize()));
return Status::OK();
Status PrepareExecute(const std::vector<Datum>& args) {
output_num_buffers_ = static_cast<int>(output_descr_.type->layout().buffers.size());
// Decide if we need to preallocate memory for this kernel
validity_preallocated_ =
(kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE &&
kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL);
if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) {
ComputeDataPreallocate(*output_descr_.type, &data_preallocated_);
return Status::OK();
std::vector<Datum> results_;
class ScalarAggExecutor : public KernelExecutorImpl<ScalarAggregateKernel> {
Status Init(KernelContext* ctx, KernelInitArgs args) override {
input_descrs_ = &args.inputs;
options_ = args.options;
return KernelExecutorImpl<ScalarAggregateKernel>::Init(ctx, args);
Status Execute(const std::vector<Datum>& args, ExecListener* listener) override {
ExecBatch batch;
while (batch_iterator_->Next(&batch)) {
// TODO: implement parallelism
if (batch.length > 0) {
Datum out;
RETURN_NOT_OK(kernel_->finalize(kernel_ctx_, &out));
return Status::OK();
Datum WrapResults(const std::vector<Datum>&,
const std::vector<Datum>& outputs) override {
DCHECK_EQ(1, outputs.size());
return outputs[0];
Status Consume(const ExecBatch& batch) {
// FIXME(ARROW-11840) don't merge *any* aggegates for every batch
auto batch_state,
kernel_->init(kernel_ctx_, {kernel_, *input_descrs_, options_}));
if (batch_state == nullptr) {
return Status::Invalid("ScalarAggregation requires non-null kernel state");
KernelContext batch_ctx(exec_context());
RETURN_NOT_OK(kernel_->consume(&batch_ctx, batch));
RETURN_NOT_OK(kernel_->merge(kernel_ctx_, std::move(*batch_state), state()));
return Status::OK();
const std::vector<ValueDescr>* input_descrs_;
const FunctionOptions* options_;
template <typename ExecutorType,
typename FunctionType = typename ExecutorType::FunctionType>
Result<std::unique_ptr<KernelExecutor>> MakeExecutor(ExecContext* ctx,
const Function* func,
const FunctionOptions* options) {
DCHECK_EQ(ExecutorType::function_kind, func->kind());
auto typed_func = checked_cast<const FunctionType*>(func);
return std::unique_ptr<KernelExecutor>(new ExecutorType(ctx, typed_func, options));
} // namespace
Status PropagateNulls(KernelContext* ctx, const ExecBatch& batch, ArrayData* output) {
DCHECK_NE(nullptr, output);
DCHECK_GT(output->buffers.size(), 0);
if (output->type->id() == Type::NA) {
// Null output type is a no-op (rare when this would happen but we at least
// will test for it)
return Status::OK();
// This function is ONLY able to write into output with non-zero offset
// when the bitmap is preallocated. This could be a DCHECK but returning
// error Status for now for emphasis
if (output->offset != 0 && output->buffers[0] == nullptr) {
return Status::Invalid(
"Can only propagate nulls into pre-allocated memory "
"when the output offset is non-zero");
NullPropagator propagator(ctx, batch, output);
return propagator.Execute();
std::unique_ptr<KernelExecutor> KernelExecutor::MakeScalar() {
return ::arrow::internal::make_unique<detail::ScalarExecutor>();
std::unique_ptr<KernelExecutor> KernelExecutor::MakeVector() {
return ::arrow::internal::make_unique<detail::VectorExecutor>();
std::unique_ptr<KernelExecutor> KernelExecutor::MakeScalarAggregate() {
return ::arrow::internal::make_unique<detail::ScalarAggExecutor>();
} // namespace detail
ExecContext::ExecContext(MemoryPool* pool, FunctionRegistry* func_registry)
: pool_(pool) {
this->func_registry_ = func_registry == nullptr ? GetFunctionRegistry() : func_registry;
CpuInfo* ExecContext::cpu_info() const { return CpuInfo::GetInstance(); }
// ----------------------------------------------------------------------
// SelectionVector
SelectionVector::SelectionVector(std::shared_ptr<ArrayData> data)
: data_(std::move(data)) {
DCHECK_EQ(Type::INT32, data_->type->id());
DCHECK_EQ(0, data_->GetNullCount());
indices_ = data_->GetValues<int32_t>(1);
SelectionVector::SelectionVector(const Array& arr) : SelectionVector( {}
int32_t SelectionVector::length() const { return static_cast<int32_t>(data_->length); }
Result<std::shared_ptr<SelectionVector>> SelectionVector::FromMask(
const BooleanArray& arr) {
return Status::NotImplemented("FromMask");
Result<Datum> CallFunction(const std::string& func_name, const std::vector<Datum>& args,
const FunctionOptions* options, ExecContext* ctx) {
if (ctx == nullptr) {
ExecContext default_ctx;
return CallFunction(func_name, args, options, &default_ctx);
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<const Function> func,
return func->Execute(args, options, ctx);
Result<Datum> CallFunction(const std::string& func_name, const std::vector<Datum>& args,
ExecContext* ctx) {
return CallFunction(func_name, args, /*options=*/nullptr, ctx);
} // namespace compute
} // namespace arrow