blob: c56e6471c97bb99dafc08032907b2ce7a3b3c3e1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstring>
#include <memory>
#include <vector>
#include <gtest/gtest.h>
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"
#include "arrow/array/array_base.h"
#include "arrow/array/data.h"
#include "arrow/buffer.h"
#include "arrow/chunked_array.h"
#include "arrow/compute/exec.h"
#include "arrow/compute/exec_internal.h"
#include "arrow/compute/function.h"
#include "arrow/compute/kernel.h"
#include "arrow/compute/registry.h"
#include "arrow/memory_pool.h"
#include "arrow/scalar.h"
#include "arrow/status.h"
#include "arrow/type.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/cpu_info.h"
#include "arrow/util/logging.h"
namespace arrow {
using internal::checked_cast;
namespace compute {
namespace detail {
TEST(ExecContext, BasicWorkings) {
{
ExecContext ctx;
ASSERT_EQ(GetFunctionRegistry(), ctx.func_registry());
ASSERT_EQ(default_memory_pool(), ctx.memory_pool());
ASSERT_EQ(std::numeric_limits<int64_t>::max(), ctx.exec_chunksize());
ASSERT_TRUE(ctx.use_threads());
ASSERT_EQ(internal::CpuInfo::GetInstance(), ctx.cpu_info());
}
// Now, let's customize all the things
LoggingMemoryPool my_pool(default_memory_pool());
std::unique_ptr<FunctionRegistry> custom_reg = FunctionRegistry::Make();
ExecContext ctx(&my_pool, custom_reg.get());
ASSERT_EQ(custom_reg.get(), ctx.func_registry());
ASSERT_EQ(&my_pool, ctx.memory_pool());
ctx.set_exec_chunksize(1 << 20);
ASSERT_EQ(1 << 20, ctx.exec_chunksize());
ctx.set_use_threads(false);
ASSERT_FALSE(ctx.use_threads());
}
TEST(SelectionVector, Basics) {
auto indices = ArrayFromJSON(int32(), "[0, 3]");
auto sel_vector = std::make_shared<SelectionVector>(*indices);
ASSERT_EQ(indices->length(), sel_vector->length());
ASSERT_EQ(3, sel_vector->indices()[1]);
}
void AssertValidityZeroExtraBits(const ArrayData& arr) {
const Buffer& buf = *arr.buffers[0];
const int64_t bit_extent = ((arr.offset + arr.length + 7) / 8) * 8;
for (int64_t i = arr.offset + arr.length; i < bit_extent; ++i) {
EXPECT_FALSE(BitUtil::GetBit(buf.data(), i)) << i;
}
}
class TestComputeInternals : public ::testing::Test {
public:
void SetUp() {
rng_.reset(new random::RandomArrayGenerator(/*seed=*/0));
ResetContexts();
}
void ResetContexts() {
exec_ctx_.reset(new ExecContext(default_memory_pool()));
ctx_.reset(new KernelContext(exec_ctx_.get()));
}
std::shared_ptr<Array> GetUInt8Array(int64_t size, double null_probability = 0.1) {
return rng_->UInt8(size, /*min=*/0, /*max=*/100, null_probability);
}
std::shared_ptr<Array> GetInt32Array(int64_t size, double null_probability = 0.1) {
return rng_->Int32(size, /*min=*/0, /*max=*/1000, null_probability);
}
std::shared_ptr<Array> GetFloat64Array(int64_t size, double null_probability = 0.1) {
return rng_->Float64(size, /*min=*/0, /*max=*/1000, null_probability);
}
std::shared_ptr<ChunkedArray> GetInt32Chunked(const std::vector<int>& sizes) {
std::vector<std::shared_ptr<Array>> chunks;
for (auto size : sizes) {
chunks.push_back(GetInt32Array(size));
}
return std::make_shared<ChunkedArray>(std::move(chunks));
}
protected:
std::unique_ptr<ExecContext> exec_ctx_;
std::unique_ptr<KernelContext> ctx_;
std::unique_ptr<random::RandomArrayGenerator> rng_;
};
class TestPropagateNulls : public TestComputeInternals {};
TEST_F(TestPropagateNulls, UnknownNullCountWithNullsZeroCopies) {
const int64_t length = 16;
constexpr uint8_t validity_bitmap[8] = {254, 0, 0, 0, 0, 0, 0, 0};
auto nulls = std::make_shared<Buffer>(validity_bitmap, 8);
ArrayData output(boolean(), length, {nullptr, nullptr});
ArrayData input(boolean(), length, {nulls, nullptr}, kUnknownNullCount);
ExecBatch batch({input}, length);
ASSERT_OK(PropagateNulls(ctx_.get(), batch, &output));
ASSERT_EQ(nulls.get(), output.buffers[0].get());
ASSERT_EQ(kUnknownNullCount, output.null_count);
ASSERT_EQ(9, output.GetNullCount());
}
TEST_F(TestPropagateNulls, UnknownNullCountWithoutNulls) {
const int64_t length = 16;
constexpr uint8_t validity_bitmap[8] = {255, 255, 0, 0, 0, 0, 0, 0};
auto nulls = std::make_shared<Buffer>(validity_bitmap, 8);
ArrayData output(boolean(), length, {nullptr, nullptr});
ArrayData input(boolean(), length, {nulls, nullptr}, kUnknownNullCount);
ExecBatch batch({input}, length);
ASSERT_OK(PropagateNulls(ctx_.get(), batch, &output));
EXPECT_EQ(-1, output.null_count);
EXPECT_EQ(nulls.get(), output.buffers[0].get());
}
TEST_F(TestPropagateNulls, SetAllNulls) {
const int64_t length = 16;
auto CheckSetAllNull = [&](std::vector<Datum> values, bool preallocate) {
// Make fresh bitmap with all 1's
uint8_t bitmap_data[2] = {255, 255};
auto preallocated_mem = std::make_shared<MutableBuffer>(bitmap_data, 2);
std::vector<std::shared_ptr<Buffer>> buffers(2);
if (preallocate) {
buffers[0] = preallocated_mem;
}
ArrayData output(boolean(), length, buffers);
ExecBatch batch(values, length);
ASSERT_OK(PropagateNulls(ctx_.get(), batch, &output));
if (preallocate) {
// Ensure that buffer object the same when we pass in preallocated memory
ASSERT_EQ(preallocated_mem.get(), output.buffers[0].get());
}
ASSERT_NE(nullptr, output.buffers[0]);
uint8_t expected[2] = {0, 0};
const Buffer& out_buf = *output.buffers[0];
ASSERT_EQ(0, std::memcmp(out_buf.data(), expected, out_buf.size()));
};
// There is a null scalar
std::shared_ptr<Scalar> i32_val = std::make_shared<Int32Scalar>(3);
std::vector<Datum> vals = {i32_val, MakeNullScalar(boolean())};
CheckSetAllNull(vals, true);
CheckSetAllNull(vals, false);
const double true_prob = 0.5;
vals[0] = rng_->Boolean(length, true_prob);
CheckSetAllNull(vals, true);
CheckSetAllNull(vals, false);
auto arr_all_nulls = rng_->Boolean(length, true_prob, /*null_probability=*/1);
// One value is all null
vals = {rng_->Boolean(length, true_prob, /*null_probability=*/0.5), arr_all_nulls};
CheckSetAllNull(vals, true);
CheckSetAllNull(vals, false);
// A value is NullType
std::shared_ptr<Array> null_arr = std::make_shared<NullArray>(length);
vals = {rng_->Boolean(length, true_prob), null_arr};
CheckSetAllNull(vals, true);
CheckSetAllNull(vals, false);
// Other nitty-gritty scenarios
{
// An all-null bitmap is zero-copied over, even though there is a
// null-scalar earlier in the batch
ArrayData output(boolean(), length, {nullptr, nullptr});
ExecBatch batch({MakeNullScalar(boolean()), arr_all_nulls}, length);
ASSERT_OK(PropagateNulls(ctx_.get(), batch, &output));
ASSERT_EQ(arr_all_nulls->data()->buffers[0].get(), output.buffers[0].get());
}
}
TEST_F(TestPropagateNulls, SingleValueWithNulls) {
// Input offset is non-zero (0 mod 8 and nonzero mod 8 cases)
const int64_t length = 100;
auto arr = rng_->Boolean(length, 0.5, /*null_probability=*/0.5);
auto CheckSliced = [&](int64_t offset, bool preallocate = false,
int64_t out_offset = 0) {
// Unaligned bitmap, zero copy not possible
auto sliced = arr->Slice(offset);
std::vector<Datum> vals = {sliced};
ArrayData output(boolean(), vals[0].length(), {nullptr, nullptr});
output.offset = out_offset;
ExecBatch batch(vals, vals[0].length());
std::shared_ptr<Buffer> preallocated_bitmap;
if (preallocate) {
ASSERT_OK_AND_ASSIGN(
preallocated_bitmap,
AllocateBuffer(BitUtil::BytesForBits(sliced->length() + out_offset)));
std::memset(preallocated_bitmap->mutable_data(), 0, preallocated_bitmap->size());
output.buffers[0] = preallocated_bitmap;
} else {
ASSERT_EQ(0, output.offset);
}
ASSERT_OK(PropagateNulls(ctx_.get(), batch, &output));
if (!preallocate) {
const Buffer* parent_buf = arr->data()->buffers[0].get();
if (offset == 0) {
// Validity bitmap same, no slice
ASSERT_EQ(parent_buf, output.buffers[0].get());
} else if (offset % 8 == 0) {
// Validity bitmap sliced
ASSERT_NE(parent_buf, output.buffers[0].get());
ASSERT_EQ(parent_buf, output.buffers[0]->parent().get());
} else {
// New memory for offset not 0 mod 8
ASSERT_NE(parent_buf, output.buffers[0].get());
ASSERT_EQ(nullptr, output.buffers[0]->parent());
}
} else {
// preallocated, so check that the validity bitmap is unbothered
ASSERT_EQ(preallocated_bitmap.get(), output.buffers[0].get());
}
ASSERT_EQ(arr->Slice(offset)->null_count(), output.GetNullCount());
ASSERT_TRUE(internal::BitmapEquals(output.buffers[0]->data(), output.offset,
sliced->null_bitmap_data(), sliced->offset(),
output.length));
AssertValidityZeroExtraBits(output);
};
CheckSliced(8);
CheckSliced(7);
CheckSliced(8, /*preallocated=*/true);
CheckSliced(7, true);
CheckSliced(8, true, /*offset=*/4);
CheckSliced(7, true, 4);
}
TEST_F(TestPropagateNulls, ZeroCopyWhenZeroNullsOnOneInput) {
const int64_t length = 16;
constexpr uint8_t validity_bitmap[8] = {254, 0, 0, 0, 0, 0, 0, 0};
auto nulls = std::make_shared<Buffer>(validity_bitmap, 8);
ArrayData some_nulls(boolean(), 16, {nulls, nullptr}, /*null_count=*/9);
ArrayData no_nulls(boolean(), length, {nullptr, nullptr}, /*null_count=*/0);
ArrayData output(boolean(), length, {nullptr, nullptr});
ExecBatch batch({some_nulls, no_nulls}, length);
ASSERT_OK(PropagateNulls(ctx_.get(), batch, &output));
ASSERT_EQ(nulls.get(), output.buffers[0].get());
ASSERT_EQ(9, output.null_count);
// Flip order of args
output = ArrayData(boolean(), length, {nullptr, nullptr});
batch.values = {no_nulls, no_nulls, some_nulls};
ASSERT_OK(PropagateNulls(ctx_.get(), batch, &output));
ASSERT_EQ(nulls.get(), output.buffers[0].get());
ASSERT_EQ(9, output.null_count);
// Check that preallocated memory is not clobbered
uint8_t bitmap_data[2] = {0, 0};
auto preallocated_mem = std::make_shared<MutableBuffer>(bitmap_data, 2);
output.null_count = kUnknownNullCount;
output.buffers[0] = preallocated_mem;
ASSERT_OK(PropagateNulls(ctx_.get(), batch, &output));
ASSERT_EQ(preallocated_mem.get(), output.buffers[0].get());
ASSERT_EQ(9, output.null_count);
ASSERT_EQ(254, bitmap_data[0]);
ASSERT_EQ(0, bitmap_data[1]);
}
TEST_F(TestPropagateNulls, IntersectsNulls) {
const int64_t length = 16;
// 0b01111111 0b11001111
constexpr uint8_t bitmap1[8] = {127, 207, 0, 0, 0, 0, 0, 0};
// 0b11111110 0b01111111
constexpr uint8_t bitmap2[8] = {254, 127, 0, 0, 0, 0, 0, 0};
// 0b11101111 0b11111110
constexpr uint8_t bitmap3[8] = {239, 254, 0, 0, 0, 0, 0, 0};
ArrayData arr1(boolean(), length, {std::make_shared<Buffer>(bitmap1, 8), nullptr});
ArrayData arr2(boolean(), length, {std::make_shared<Buffer>(bitmap2, 8), nullptr});
ArrayData arr3(boolean(), length, {std::make_shared<Buffer>(bitmap3, 8), nullptr});
auto CheckCase = [&](std::vector<Datum> values, int64_t ex_null_count,
const uint8_t* ex_bitmap, bool preallocate = false,
int64_t output_offset = 0) {
ExecBatch batch(values, length);
std::shared_ptr<Buffer> nulls;
if (preallocate) {
// Make the buffer one byte bigger so we can have non-zero offsets
ASSERT_OK_AND_ASSIGN(nulls, AllocateBuffer(3));
std::memset(nulls->mutable_data(), 0, nulls->size());
} else {
// non-zero output offset not permitted unless the output memory is
// preallocated
ASSERT_EQ(0, output_offset);
}
ArrayData output(boolean(), length, {nulls, nullptr});
output.offset = output_offset;
ASSERT_OK(PropagateNulls(ctx_.get(), batch, &output));
// Preallocated memory used
if (preallocate) {
ASSERT_EQ(nulls.get(), output.buffers[0].get());
}
EXPECT_EQ(kUnknownNullCount, output.null_count);
EXPECT_EQ(ex_null_count, output.GetNullCount());
const auto& out_buffer = *output.buffers[0];
ASSERT_TRUE(internal::BitmapEquals(out_buffer.data(), output_offset, ex_bitmap,
/*ex_offset=*/0, length));
// Now check that the rest of the bits in out_buffer are still 0
AssertValidityZeroExtraBits(output);
};
// 0b01101110 0b01001110
uint8_t expected1[2] = {110, 78};
CheckCase({arr1, arr2, arr3}, 7, expected1);
CheckCase({arr1, arr2, arr3}, 7, expected1, /*preallocate=*/true);
CheckCase({arr1, arr2, arr3}, 7, expected1, /*preallocate=*/true,
/*output_offset=*/4);
// 0b01111110 0b01001111
uint8_t expected2[2] = {126, 79};
CheckCase({arr1, arr2}, 5, expected2);
CheckCase({arr1, arr2}, 5, expected2, /*preallocate=*/true,
/*output_offset=*/4);
}
TEST_F(TestPropagateNulls, NullOutputTypeNoop) {
// Ensure we leave the buffers alone when the output type is null()
const int64_t length = 100;
ExecBatch batch({rng_->Boolean(100, 0.5, 0.5)}, length);
ArrayData output(null(), length, {nullptr});
ASSERT_OK(PropagateNulls(ctx_.get(), batch, &output));
ASSERT_EQ(nullptr, output.buffers[0]);
}
// ----------------------------------------------------------------------
// ExecBatchIterator
class TestExecBatchIterator : public TestComputeInternals {
public:
void SetupIterator(std::vector<Datum> args,
int64_t max_chunksize = kDefaultMaxChunksize) {
ASSERT_OK_AND_ASSIGN(iterator_,
ExecBatchIterator::Make(std::move(args), max_chunksize));
}
void CheckIteration(const std::vector<Datum>& args, int chunksize,
const std::vector<int>& ex_batch_sizes) {
SetupIterator(args, chunksize);
ExecBatch batch;
int64_t position = 0;
for (size_t i = 0; i < ex_batch_sizes.size(); ++i) {
ASSERT_EQ(position, iterator_->position());
ASSERT_TRUE(iterator_->Next(&batch));
ASSERT_EQ(ex_batch_sizes[i], batch.length);
for (size_t j = 0; j < args.size(); ++j) {
switch (args[j].kind()) {
case Datum::SCALAR:
ASSERT_TRUE(args[j].scalar()->Equals(batch[j].scalar()));
break;
case Datum::ARRAY:
AssertArraysEqual(*args[j].make_array()->Slice(position, batch.length),
*batch[j].make_array());
break;
case Datum::CHUNKED_ARRAY: {
const ChunkedArray& carr = *args[j].chunked_array();
if (batch.length == 0) {
ASSERT_EQ(0, carr.length());
} else {
auto arg_slice = carr.Slice(position, batch.length);
// The sliced ChunkedArrays should only ever be 1 chunk
ASSERT_EQ(1, arg_slice->num_chunks());
AssertArraysEqual(*arg_slice->chunk(0), *batch[j].make_array());
}
} break;
default:
break;
}
}
position += ex_batch_sizes[i];
}
// Ensure that the iterator is exhausted
ASSERT_FALSE(iterator_->Next(&batch));
ASSERT_EQ(iterator_->length(), iterator_->position());
}
protected:
std::unique_ptr<ExecBatchIterator> iterator_;
};
TEST_F(TestExecBatchIterator, Basics) {
const int64_t length = 100;
// Simple case with a single chunk
std::vector<Datum> args = {Datum(GetInt32Array(length)), Datum(GetFloat64Array(length)),
Datum(std::make_shared<Int32Scalar>(3))};
SetupIterator(args);
ExecBatch batch;
ASSERT_TRUE(iterator_->Next(&batch));
ASSERT_EQ(3, batch.values.size());
ASSERT_EQ(3, batch.num_values());
ASSERT_EQ(length, batch.length);
std::vector<ValueDescr> descrs = batch.GetDescriptors();
ASSERT_EQ(ValueDescr::Array(int32()), descrs[0]);
ASSERT_EQ(ValueDescr::Array(float64()), descrs[1]);
ASSERT_EQ(ValueDescr::Scalar(int32()), descrs[2]);
AssertArraysEqual(*args[0].make_array(), *batch[0].make_array());
AssertArraysEqual(*args[1].make_array(), *batch[1].make_array());
ASSERT_TRUE(args[2].scalar()->Equals(batch[2].scalar()));
ASSERT_EQ(length, iterator_->position());
ASSERT_FALSE(iterator_->Next(&batch));
// Split into chunks of size 16
CheckIteration(args, /*chunksize=*/16, {16, 16, 16, 16, 16, 16, 4});
}
TEST_F(TestExecBatchIterator, InputValidation) {
std::vector<Datum> args = {Datum(GetInt32Array(10)), Datum(GetInt32Array(9))};
ASSERT_RAISES(Invalid, ExecBatchIterator::Make(args));
args = {Datum(GetInt32Array(9)), Datum(GetInt32Array(10))};
ASSERT_RAISES(Invalid, ExecBatchIterator::Make(args));
args = {Datum(GetInt32Array(10))};
ASSERT_OK_AND_ASSIGN(auto iterator, ExecBatchIterator::Make(args));
ASSERT_EQ(10, iterator->max_chunksize());
}
TEST_F(TestExecBatchIterator, ChunkedArrays) {
std::vector<Datum> args = {Datum(GetInt32Chunked({0, 20, 10})),
Datum(GetInt32Chunked({15, 15})), Datum(GetInt32Array(30)),
Datum(std::make_shared<Int32Scalar>(5)),
Datum(MakeNullScalar(boolean()))};
CheckIteration(args, /*chunksize=*/10, {10, 5, 5, 10});
CheckIteration(args, /*chunksize=*/20, {15, 5, 10});
CheckIteration(args, /*chunksize=*/30, {15, 5, 10});
}
TEST_F(TestExecBatchIterator, ZeroLengthInputs) {
auto carr = std::shared_ptr<ChunkedArray>(new ChunkedArray({}, int32()));
auto CheckArgs = [&](const std::vector<Datum>& args) {
auto iterator = ExecBatchIterator::Make(args).ValueOrDie();
ExecBatch batch;
ASSERT_FALSE(iterator->Next(&batch));
};
// Zero-length ChunkedArray with zero chunks
std::vector<Datum> args = {Datum(carr)};
CheckArgs(args);
// Zero-length array
args = {Datum(GetInt32Array(0))};
CheckArgs(args);
// ChunkedArray with single empty chunk
args = {Datum(GetInt32Chunked({0}))};
CheckArgs(args);
}
// ----------------------------------------------------------------------
// Scalar function execution
Status ExecCopy(KernelContext*, const ExecBatch& batch, Datum* out) {
DCHECK_EQ(1, batch.num_values());
const auto& type = checked_cast<const FixedWidthType&>(*batch[0].type());
int value_size = type.bit_width() / 8;
const ArrayData& arg0 = *batch[0].array();
ArrayData* out_arr = out->mutable_array();
uint8_t* dst = out_arr->buffers[1]->mutable_data() + out_arr->offset * value_size;
const uint8_t* src = arg0.buffers[1]->data() + arg0.offset * value_size;
std::memcpy(dst, src, batch.length * value_size);
return Status::OK();
}
Status ExecComputedBitmap(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
// Propagate nulls not used. Check that the out bitmap isn't the same already
// as the input bitmap
const ArrayData& arg0 = *batch[0].array();
ArrayData* out_arr = out->mutable_array();
if (internal::CountSetBits(arg0.buffers[0]->data(), arg0.offset, batch.length) > 0) {
// Check that the bitmap has not been already copied over
DCHECK(!internal::BitmapEquals(arg0.buffers[0]->data(), arg0.offset,
out_arr->buffers[0]->data(), out_arr->offset,
batch.length));
}
internal::CopyBitmap(arg0.buffers[0]->data(), arg0.offset, batch.length,
out_arr->buffers[0]->mutable_data(), out_arr->offset);
return ExecCopy(ctx, batch, out);
}
Status ExecNoPreallocatedData(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
// Validity preallocated, but not the data
ArrayData* out_arr = out->mutable_array();
DCHECK_EQ(0, out_arr->offset);
const auto& type = checked_cast<const FixedWidthType&>(*batch[0].type());
int value_size = type.bit_width() / 8;
Status s = (ctx->Allocate(out_arr->length * value_size).Value(&out_arr->buffers[1]));
DCHECK_OK(s);
return ExecCopy(ctx, batch, out);
}
Status ExecNoPreallocatedAnything(KernelContext* ctx, const ExecBatch& batch,
Datum* out) {
// Neither validity nor data preallocated
ArrayData* out_arr = out->mutable_array();
DCHECK_EQ(0, out_arr->offset);
Status s = (ctx->AllocateBitmap(out_arr->length).Value(&out_arr->buffers[0]));
DCHECK_OK(s);
const ArrayData& arg0 = *batch[0].array();
internal::CopyBitmap(arg0.buffers[0]->data(), arg0.offset, batch.length,
out_arr->buffers[0]->mutable_data(), /*offset=*/0);
// Reuse the kernel that allocates the data
return ExecNoPreallocatedData(ctx, batch, out);
}
struct ExampleOptions : public FunctionOptions {
std::shared_ptr<Scalar> value;
explicit ExampleOptions(std::shared_ptr<Scalar> value) : value(std::move(value)) {}
};
struct ExampleState : public KernelState {
std::shared_ptr<Scalar> value;
explicit ExampleState(std::shared_ptr<Scalar> value) : value(std::move(value)) {}
};
Result<std::unique_ptr<KernelState>> InitStateful(KernelContext*,
const KernelInitArgs& args) {
auto func_options = static_cast<const ExampleOptions*>(args.options);
return std::unique_ptr<KernelState>(new ExampleState{func_options->value});
}
Status ExecStateful(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
// We take the value from the state and multiply the data in batch[0] with it
ExampleState* state = static_cast<ExampleState*>(ctx->state());
int32_t multiplier = checked_cast<const Int32Scalar&>(*state->value).value;
const ArrayData& arg0 = *batch[0].array();
ArrayData* out_arr = out->mutable_array();
const int32_t* arg0_data = arg0.GetValues<int32_t>(1);
int32_t* dst = out_arr->GetMutableValues<int32_t>(1);
for (int64_t i = 0; i < arg0.length; ++i) {
dst[i] = arg0_data[i] * multiplier;
}
return Status::OK();
}
Status ExecAddInt32(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
const Int32Scalar& arg0 = batch[0].scalar_as<Int32Scalar>();
const Int32Scalar& arg1 = batch[1].scalar_as<Int32Scalar>();
out->value = std::make_shared<Int32Scalar>(arg0.value + arg1.value);
return Status::OK();
}
class TestCallScalarFunction : public TestComputeInternals {
protected:
static bool initialized_;
void SetUp() {
TestComputeInternals::SetUp();
if (!initialized_) {
initialized_ = true;
AddCopyFunctions();
AddNoPreallocateFunctions();
AddStatefulFunction();
AddScalarFunction();
}
}
void AddCopyFunctions() {
auto registry = GetFunctionRegistry();
// This function simply copies memory from the input argument into the
// (preallocated) output
auto func =
std::make_shared<ScalarFunction>("test_copy", Arity::Unary(), /*doc=*/nullptr);
// Add a few kernels. Our implementation only accepts arrays
ASSERT_OK(func->AddKernel({InputType::Array(uint8())}, uint8(), ExecCopy));
ASSERT_OK(func->AddKernel({InputType::Array(int32())}, int32(), ExecCopy));
ASSERT_OK(func->AddKernel({InputType::Array(float64())}, float64(), ExecCopy));
ASSERT_OK(registry->AddFunction(func));
// A version which doesn't want the executor to call PropagateNulls
auto func2 = std::make_shared<ScalarFunction>("test_copy_computed_bitmap",
Arity::Unary(), /*doc=*/nullptr);
ScalarKernel kernel({InputType::Array(uint8())}, uint8(), ExecComputedBitmap);
kernel.null_handling = NullHandling::COMPUTED_PREALLOCATE;
ASSERT_OK(func2->AddKernel(kernel));
ASSERT_OK(registry->AddFunction(func2));
}
void AddNoPreallocateFunctions() {
auto registry = GetFunctionRegistry();
// A function that allocates its own output memory. We have cases for both
// non-preallocated data and non-preallocated validity bitmap
auto f1 = std::make_shared<ScalarFunction>("test_nopre_data", Arity::Unary(),
/*doc=*/nullptr);
auto f2 = std::make_shared<ScalarFunction>("test_nopre_validity_or_data",
Arity::Unary(), /*doc=*/nullptr);
ScalarKernel kernel({InputType::Array(uint8())}, uint8(), ExecNoPreallocatedData);
kernel.mem_allocation = MemAllocation::NO_PREALLOCATE;
ASSERT_OK(f1->AddKernel(kernel));
kernel.exec = ExecNoPreallocatedAnything;
kernel.null_handling = NullHandling::COMPUTED_NO_PREALLOCATE;
ASSERT_OK(f2->AddKernel(kernel));
ASSERT_OK(registry->AddFunction(f1));
ASSERT_OK(registry->AddFunction(f2));
}
void AddStatefulFunction() {
auto registry = GetFunctionRegistry();
// This function's behavior depends on a static parameter that is made
// available to the kernel's execution function through its Options object
auto func = std::make_shared<ScalarFunction>("test_stateful", Arity::Unary(),
/*doc=*/nullptr);
ScalarKernel kernel({InputType::Array(int32())}, int32(), ExecStateful, InitStateful);
ASSERT_OK(func->AddKernel(kernel));
ASSERT_OK(registry->AddFunction(func));
}
void AddScalarFunction() {
auto registry = GetFunctionRegistry();
auto func = std::make_shared<ScalarFunction>("test_scalar_add_int32", Arity::Binary(),
/*doc=*/nullptr);
ASSERT_OK(func->AddKernel({InputType::Scalar(int32()), InputType::Scalar(int32())},
int32(), ExecAddInt32));
ASSERT_OK(registry->AddFunction(func));
}
};
bool TestCallScalarFunction::initialized_ = false;
TEST_F(TestCallScalarFunction, ArgumentValidation) {
// Copy accepts only a single array argument
Datum d1(GetInt32Array(10));
// Too many args
std::vector<Datum> args = {d1, d1};
ASSERT_RAISES(Invalid, CallFunction("test_copy", args));
// Too few
args = {};
ASSERT_RAISES(Invalid, CallFunction("test_copy", args));
// Cannot do scalar
args = {Datum(std::make_shared<Int32Scalar>(5))};
ASSERT_RAISES(NotImplemented, CallFunction("test_copy", args));
}
TEST_F(TestCallScalarFunction, PreallocationCases) {
double null_prob = 0.2;
auto arr = GetUInt8Array(1000, null_prob);
auto CheckFunction = [&](std::string func_name) {
ResetContexts();
// The default should be a single array output
{
std::vector<Datum> args = {Datum(arr)};
ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args));
ASSERT_EQ(Datum::ARRAY, result.kind());
AssertArraysEqual(*arr, *result.make_array());
}
// Set the exec_chunksize to be smaller, so now we have several invocations
// of the kernel, but still the output is onee array
{
std::vector<Datum> args = {Datum(arr)};
exec_ctx_->set_exec_chunksize(80);
ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args, exec_ctx_.get()));
AssertArraysEqual(*arr, *result.make_array());
}
{
// Chunksize not multiple of 8
std::vector<Datum> args = {Datum(arr)};
exec_ctx_->set_exec_chunksize(111);
ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args, exec_ctx_.get()));
AssertArraysEqual(*arr, *result.make_array());
}
// Input is chunked, output has one big chunk
{
auto carr = std::shared_ptr<ChunkedArray>(
new ChunkedArray({arr->Slice(0, 100), arr->Slice(100)}));
std::vector<Datum> args = {Datum(carr)};
ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args, exec_ctx_.get()));
std::shared_ptr<ChunkedArray> actual = result.chunked_array();
ASSERT_EQ(1, actual->num_chunks());
AssertChunkedEquivalent(*carr, *actual);
}
// Preallocate independently for each batch
{
std::vector<Datum> args = {Datum(arr)};
exec_ctx_->set_preallocate_contiguous(false);
exec_ctx_->set_exec_chunksize(400);
ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args, exec_ctx_.get()));
ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind());
const ChunkedArray& carr = *result.chunked_array();
ASSERT_EQ(3, carr.num_chunks());
AssertArraysEqual(*arr->Slice(0, 400), *carr.chunk(0));
AssertArraysEqual(*arr->Slice(400, 400), *carr.chunk(1));
AssertArraysEqual(*arr->Slice(800), *carr.chunk(2));
}
};
CheckFunction("test_copy");
CheckFunction("test_copy_computed_bitmap");
}
TEST_F(TestCallScalarFunction, BasicNonStandardCases) {
// Test a handful of cases
//
// * Validity bitmap computed by kernel rather than using PropagateNulls
// * Data not pre-allocated
// * Validity bitmap not pre-allocated
double null_prob = 0.2;
auto arr = GetUInt8Array(1000, null_prob);
std::vector<Datum> args = {Datum(arr)};
auto CheckFunction = [&](std::string func_name) {
ResetContexts();
// The default should be a single array output
{
ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args));
AssertArraysEqual(*arr, *result.make_array(), true);
}
// Split execution into 3 chunks
{
exec_ctx_->set_exec_chunksize(400);
ASSERT_OK_AND_ASSIGN(Datum result, CallFunction(func_name, args, exec_ctx_.get()));
ASSERT_EQ(Datum::CHUNKED_ARRAY, result.kind());
const ChunkedArray& carr = *result.chunked_array();
ASSERT_EQ(3, carr.num_chunks());
AssertArraysEqual(*arr->Slice(0, 400), *carr.chunk(0));
AssertArraysEqual(*arr->Slice(400, 400), *carr.chunk(1));
AssertArraysEqual(*arr->Slice(800), *carr.chunk(2));
}
};
CheckFunction("test_nopre_data");
CheckFunction("test_nopre_validity_or_data");
}
TEST_F(TestCallScalarFunction, StatefulKernel) {
auto input = ArrayFromJSON(int32(), "[1, 2, 3, null, 5]");
auto multiplier = std::make_shared<Int32Scalar>(2);
auto expected = ArrayFromJSON(int32(), "[2, 4, 6, null, 10]");
ExampleOptions options(multiplier);
std::vector<Datum> args = {Datum(input)};
ASSERT_OK_AND_ASSIGN(Datum result, CallFunction("test_stateful", args, &options));
AssertArraysEqual(*expected, *result.make_array());
}
TEST_F(TestCallScalarFunction, ScalarFunction) {
std::vector<Datum> args = {Datum(std::make_shared<Int32Scalar>(5)),
Datum(std::make_shared<Int32Scalar>(7))};
ASSERT_OK_AND_ASSIGN(Datum result, CallFunction("test_scalar_add_int32", args));
ASSERT_EQ(Datum::SCALAR, result.kind());
auto expected = std::make_shared<Int32Scalar>(12);
ASSERT_TRUE(expected->Equals(*result.scalar()));
}
} // namespace detail
} // namespace compute
} // namespace arrow