blob: 30bfbc0246a1c872c687a1e7f567646ee61a5580 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build go1.18
package compute
import (
func ExecCopyArray(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error {
debug.Assert(len(batch.Values) == 1, "wrong number of values")
valueSize := int64(batch.Values[0].Type().(arrow.FixedWidthDataType).BitWidth() / 8)
arg0 := batch.Values[0].Array
dst := out.Buffers[1].Buf[out.Offset*valueSize:]
src := arg0.Buffers[1].Buf[arg0.Offset*valueSize:]
copy(dst, src[:batch.Len*valueSize])
return nil
func ExecComputedBitmap(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error {
// propagate nulls not used. check that out bitmap isn't the same already
// as the input bitmap
arg0 := batch.Values[0].Array
if bitutil.CountSetBits(arg0.Buffers[1].Buf, int(arg0.Offset), int(batch.Len)) > 0 {
// check that the bitmap hasn't already been copied
debug.Assert(!bitutil.BitmapEquals(arg0.Buffers[0].Buf, out.Buffers[0].Buf,
arg0.Offset, out.Offset, batch.Len), "bitmap should not have already been copied")
bitutil.CopyBitmap(arg0.Buffers[0].Buf, int(arg0.Offset), int(batch.Len), out.Buffers[0].Buf, int(out.Offset))
return ExecCopyArray(ctx, batch, out)
func ExecNoPreallocatedData(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error {
// validity preallocated, not data
debug.Assert(out.Offset == 0, "invalid offset for non-prealloc")
valueSize := int64(batch.Values[0].Type().(arrow.FixedWidthDataType).BitWidth() / 8)
out.Buffers[1].SetBuffer(ctx.Allocate(int(out.Len * valueSize)))
out.Buffers[1].SelfAlloc = true
return ExecCopyArray(ctx, batch, out)
func ExecNoPreallocatedAnything(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error {
// neither validity nor data preallocated
debug.Assert(out.Offset == 0, "invalid offset for non-prealloc")
out.Buffers[0].SelfAlloc = true
arg0 := batch.Values[0].Array
bitutil.CopyBitmap(arg0.Buffers[0].Buf, int(arg0.Offset), int(batch.Len), out.Buffers[0].Buf, 0)
// reuse kernel that allocates data
return ExecNoPreallocatedData(ctx, batch, out)
type ExampleOptions struct {
Value scalar.Scalar
func (e *ExampleOptions) TypeName() string { return "example" }
type ExampleState struct {
Value scalar.Scalar
func InitStateful(_ *exec.KernelCtx, args exec.KernelInitArgs) (exec.KernelState, error) {
value := args.Options.(*ExampleOptions).Value
return &ExampleState{Value: value}, nil
func ExecStateful(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error {
state := ctx.State.(*ExampleState)
multiplier := state.Value.(*scalar.Int32).Value
arg0 := batch.Values[0].Array
arg0Data := exec.GetSpanValues[int32](&arg0, 1)
dst := exec.GetSpanValues[int32](out, 1)
for i, v := range arg0Data {
dst[i] = v * multiplier
return nil
func ExecAddInt32(ctx *exec.KernelCtx, batch *exec.ExecSpan, out *exec.ExecResult) error {
left := exec.GetSpanValues[int32](&batch.Values[0].Array, 1)
right := exec.GetSpanValues[int32](&batch.Values[1].Array, 1)
outValues := exec.GetSpanValues[int32](out, 1)
for i := 0; i < int(batch.Len); i++ {
outValues[i] = left[i] + right[i]
return nil
type CallScalarFuncSuite struct {
func (c *CallScalarFuncSuite) addCopyFuncs() {
registry = GetFunctionRegistry()
fn := NewScalarFunction("test_copy", Unary(), EmptyFuncDoc)
types := []arrow.DataType{arrow.PrimitiveTypes.Uint8, arrow.PrimitiveTypes.Int32, arrow.PrimitiveTypes.Float64}
for _, t := range types {
exec.NewOutputType(t), ExecCopyArray, nil))
c.True(registry.AddFunction(fn, false))
// a version which doesn't want the executor to call propagatenulls
fn2 := NewScalarFunction("test_copy_computed_bitmap", Unary(), EmptyFuncDoc)
kernel := exec.NewScalarKernel([]exec.InputType{exec.NewExactInput(arrow.PrimitiveTypes.Uint8)},
exec.NewOutputType(arrow.PrimitiveTypes.Uint8), ExecComputedBitmap, nil)
kernel.NullHandling = exec.NullComputedPrealloc
c.True(registry.AddFunction(fn2, false))
func (c *CallScalarFuncSuite) addNoPreallocFuncs() {
registry = GetFunctionRegistry()
// a function that allocates its own output memory. we have cases
// for both non-preallocated data and non-preallocated bitmap
f1 := NewScalarFunction("test_nopre_data", Unary(), EmptyFuncDoc)
f2 := NewScalarFunction("test_nopre_validity_or_data", Unary(), EmptyFuncDoc)
kernel := exec.NewScalarKernel(
ExecNoPreallocatedData, nil)
kernel.MemAlloc = exec.MemNoPrealloc
kernel.ExecFn = ExecNoPreallocatedAnything
kernel.NullHandling = exec.NullComputedNoPrealloc
c.True(registry.AddFunction(f1, false))
c.True(registry.AddFunction(f2, false))
func (c *CallScalarFuncSuite) addStatefulFunc() {
registry := GetFunctionRegistry()
// this functions behavior depends on a static parameter that
// is made available to the execution through its options object
fn := NewScalarFunction("test_stateful", Unary(), EmptyFuncDoc)
exec.NewOutputType(arrow.PrimitiveTypes.Int32), ExecStateful, InitStateful))
c.True(registry.AddFunction(fn, false))
func (c *CallScalarFuncSuite) addScalarFunc() {
registry := GetFunctionRegistry()
fn := NewScalarFunction("test_scalar_add_int32", Binary(), EmptyFuncDoc)
exec.NewOutputType(arrow.PrimitiveTypes.Int32), ExecAddInt32, nil))
c.True(registry.AddFunction(fn, false))
func (c *CallScalarFuncSuite) SetupSuite() {
func (c *CallScalarFuncSuite) TestArgumentValidation() {
// copy accepts only a single array arg
arr := c.getInt32Arr(10, 0.1)
defer arr.Release()
d1 := &ArrayDatum{Value: arr.Data()}
c.Run("too many args", func() {
args := []Datum{d1, d1}
_, err := CallFunction(c.ctx.Ctx, "test_copy", nil, args...)
c.ErrorIs(err, arrow.ErrInvalid)
c.Run("too few args", func() {
_, err := CallFunction(c.ctx.Ctx, "test_copy", nil)
c.ErrorIs(err, arrow.ErrInvalid)
d1Scalar := NewDatum(int32(5))
result, err := CallFunction(c.ctx.Ctx, "test_copy", nil, d1)
result, err = CallFunction(c.ctx.Ctx, "test_copy", nil, d1Scalar)
func (c *CallScalarFuncSuite) TestPreallocationCases() {
nullProb := float64(0.2)
arr := c.getUint8Arr(100, nullProb)
defer arr.Release()
funcNames := []string{"test_copy", "test_copy_computed_bitmap"}
for _, funcName := range funcNames {
c.Run(funcName, func() {
c.Run("single output default", func() {
result, err := CallFunction(c.ctx.Ctx, funcName, nil, &ArrayDatum{arr.Data()})
defer result.Release()
c.Equal(KindArray, result.Kind())
c.assertDatumEqual(arr, result)
c.Run("exec chunks", func() {
// set the exec_chunksize to be smaller so now we have
// several invocations of the kernel,
// but still only one output array
c.execCtx.ChunkSize = 80
result, err := CallFunction(SetExecCtx(c.ctx.Ctx, c.execCtx), funcName, nil, &ArrayDatum{arr.Data()})
defer result.Release()
c.Equal(KindArray, result.Kind())
c.assertDatumEqual(arr, result)
c.Run("not multiple 8 chunk", func() {
// chunksize is not a multiple of 8
c.execCtx.ChunkSize = 11
result, err := CallFunction(SetExecCtx(c.ctx.Ctx, c.execCtx), funcName, nil, &ArrayDatum{arr.Data()})
defer result.Release()
c.Equal(KindArray, result.Kind())
c.assertDatumEqual(arr, result)
c.Run("chunked", func() {
// input is chunked, output is one big chunk
chk1, chk2 := array.NewSlice(arr, 0, 10), array.NewSlice(arr, 10, int64(arr.Len()))
defer chk1.Release()
defer chk2.Release()
carr := arrow.NewChunked(arr.DataType(), []arrow.Array{chk1, chk2})
defer carr.Release()
result, err := CallFunction(SetExecCtx(c.ctx.Ctx, c.execCtx), funcName, nil, &ChunkedDatum{carr})
defer result.Release()
c.Equal(KindChunked, result.Kind())
actual := result.(*ChunkedDatum).Value
c.Len(actual.Chunks(), 1)
c.Truef(array.ChunkedEqual(actual, carr), "expected: %s\ngot: %s", carr, actual)
c.Run("independent", func() {
// preallocate independently for each batch
c.execCtx.PreallocContiguous = false
c.execCtx.ChunkSize = 40
result, err := CallFunction(SetExecCtx(c.ctx.Ctx, c.execCtx), funcName, nil, &ArrayDatum{arr.Data()})
defer result.Release()
c.Equal(KindChunked, result.Kind())
carr := result.(*ChunkedDatum).Value
c.Len(carr.Chunks(), 3)
sl := array.NewSlice(arr, 0, 40)
defer sl.Release()
c.assertArrayEqual(sl, carr.Chunk(0))
sl = array.NewSlice(arr, 40, 80)
defer sl.Release()
c.assertArrayEqual(sl, carr.Chunk(1))
sl = array.NewSlice(arr, 80, int64(arr.Len()))
defer sl.Release()
c.assertArrayEqual(sl, carr.Chunk(2))
func (c *CallScalarFuncSuite) TestBasicNonStandardCases() {
// test some more cases
// * validity bitmap computed by kernel rather than propagate nulls
// * data not pre-allocated
// * validity bitmap not pre-allocated
nullProb := float64(0.2)
arr := c.getUint8Arr(1000, nullProb)
defer arr.Release()
args := []Datum{&ArrayDatum{arr.Data()}}
for _, funcName := range []string{"test_nopre_data", "test_nopre_validity_or_data"} {
c.Run("funcName", func() {
c.Run("single output default", func() {
result, err := CallFunction(c.ctx.Ctx, funcName, nil, args...)
defer result.Release()
c.Equal(KindArray, result.Kind())
c.assertDatumEqual(arr, result)
c.Run("split into 3 chunks", func() {
c.execCtx.ChunkSize = 400
result, err := CallFunction(SetExecCtx(c.ctx.Ctx, c.execCtx), funcName, nil, args...)
defer result.Release()
c.Equal(KindChunked, result.Kind())
carr := result.(*ChunkedDatum).Value
c.Len(carr.Chunks(), 3)
sl := array.NewSlice(arr, 0, 400)
defer sl.Release()
c.assertArrayEqual(sl, carr.Chunk(0))
sl = array.NewSlice(arr, 400, 800)
defer sl.Release()
c.assertArrayEqual(sl, carr.Chunk(1))
sl = array.NewSlice(arr, 800, int64(arr.Len()))
defer sl.Release()
c.assertArrayEqual(sl, carr.Chunk(2))
func (c *CallScalarFuncSuite) TestStatefulKernel() {
input, _, _ := array.FromJSON(c.mem, arrow.PrimitiveTypes.Int32, strings.NewReader(`[1, 2, 3, null, 5]`))
defer input.Release()
multiplier := scalar.MakeScalar(int32(2))
expected, _, _ := array.FromJSON(c.mem, arrow.PrimitiveTypes.Int32, strings.NewReader(`[2, 4, 6, null, 10]`))
defer expected.Release()
options := &ExampleOptions{multiplier}
result, err := CallFunction(c.ctx.Ctx, "test_stateful", options, &ArrayDatum{input.Data()})
defer result.Release()
c.assertDatumEqual(expected, result)
func (c *CallScalarFuncSuite) TestScalarFunction() {
args := []Datum{NewDatum(int32(5)), NewDatum(int32(7))}
result, err := CallFunction(c.ctx.Ctx, "test_scalar_add_int32", nil, args...)
defer result.Release()
c.Equal(KindScalar, result.Kind())
expected := scalar.MakeScalar(int32(12))
c.True(scalar.Equals(expected, result.(*ScalarDatum).Value))
func TestCallScalarFunctions(t *testing.T) {
suite.Run(t, new(CallScalarFuncSuite))