blob: 8df5a6a6fc6676c1ddf0f2cee43b978df3e4370c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package array_test
import (
"fmt"
"reflect"
"testing"
"github.com/apache/arrow/go/arrow"
"github.com/apache/arrow/go/arrow/array"
"github.com/apache/arrow/go/arrow/memory"
)
func TestRecord(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)
schema := arrow.NewSchema(
[]arrow.Field{
arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64},
},
nil,
)
col1 := func() array.Interface {
ib := array.NewInt32Builder(mem)
defer ib.Release()
ib.AppendValues([]int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
return ib.NewInt32Array()
}()
defer col1.Release()
col2 := func() array.Interface {
b := array.NewFloat64Builder(mem)
defer b.Release()
b.AppendValues([]float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
return b.NewFloat64Array()
}()
defer col2.Release()
cols := []array.Interface{col1, col2}
rec := array.NewRecord(schema, cols, -1)
defer rec.Release()
rec.Retain()
rec.Release()
if got, want := rec.Schema(), schema; !got.Equal(want) {
t.Fatalf("invalid schema: got=%#v, want=%#v", got, want)
}
if got, want := rec.NumRows(), int64(10); got != want {
t.Fatalf("invalid number of rows: got=%d, want=%d", got, want)
}
if got, want := rec.NumCols(), int64(2); got != want {
t.Fatalf("invalid number of columns: got=%d, want=%d", got, want)
}
if got, want := rec.Columns()[0], cols[0]; got != want {
t.Fatalf("invalid column: got=%q, want=%q", got, want)
}
if got, want := rec.Column(0), cols[0]; got != want {
t.Fatalf("invalid column: got=%q, want=%q", got, want)
}
if got, want := rec.ColumnName(0), schema.Field(0).Name; got != want {
t.Fatalf("invalid column name: got=%q, want=%q", got, want)
}
for _, tc := range []struct {
i, j int64
err error
}{
{i: 0, j: 10, err: nil},
{i: 1, j: 10, err: nil},
{i: 1, j: 9, err: nil},
{i: 0, j: 0, err: nil},
{i: 1, j: 1, err: nil},
{i: 10, j: 10, err: nil},
{i: 1, j: 0, err: fmt.Errorf("arrow/array: index out of range")},
{i: 1, j: 11, err: fmt.Errorf("arrow/array: index out of range")},
} {
t.Run(fmt.Sprintf("slice-%02d-%02d", tc.i, tc.j), func(t *testing.T) {
if tc.err != nil {
defer func() {
e := recover()
if e == nil {
t.Fatalf("expected an error %q", tc.err)
}
switch err := e.(type) {
case string:
if err != tc.err.Error() {
t.Fatalf("invalid panic message. got=%q, want=%q", err, tc.err)
}
case error:
if err.Error() != tc.err.Error() {
t.Fatalf("invalid panic message. got=%q, want=%q", err, tc.err)
}
default:
t.Fatalf("invalid type for panic message: %T (err=%v)", err, err)
}
}()
}
sub := rec.NewSlice(tc.i, tc.j)
defer sub.Release()
if got, want := sub.NumRows(), tc.j-tc.i; got != want {
t.Fatalf("invalid rec-slice number of rows: got=%d, want=%d", got, want)
}
})
}
for _, tc := range []struct {
schema *arrow.Schema
cols []array.Interface
rows int64
err error
}{
{
schema: schema,
cols: nil,
rows: -1,
err: fmt.Errorf("arrow/array: number of columns/fields mismatch"),
},
{
schema: schema,
cols: cols[:1],
rows: 0,
err: fmt.Errorf("arrow/array: number of columns/fields mismatch"),
},
{
schema: arrow.NewSchema(
[]arrow.Field{
arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
},
nil,
),
cols: cols,
rows: 0,
err: fmt.Errorf("arrow/array: number of columns/fields mismatch"),
},
{
schema: arrow.NewSchema(
[]arrow.Field{
arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Int32},
},
nil,
),
cols: cols,
rows: 0,
err: fmt.Errorf(`arrow/array: column "f2-f64" type mismatch: got=float64, want=int32`),
},
{
schema: schema,
cols: cols,
rows: 11,
err: fmt.Errorf(`arrow/array: mismatch number of rows in column "f1-i32": got=10, want=11`),
},
{
schema: schema,
cols: cols,
rows: 10,
err: nil,
},
{
schema: schema,
cols: cols,
rows: 3,
err: nil,
},
{
schema: schema,
cols: cols,
rows: 0,
err: nil,
},
} {
t.Run("", func(t *testing.T) {
if tc.err != nil {
defer func() {
e := recover()
if e == nil {
t.Fatalf("expected an error %q", tc.err)
}
switch err := e.(type) {
case string:
if err != tc.err.Error() {
t.Fatalf("invalid panic message. got=%q, want=%q", err, tc.err)
}
case error:
if err.Error() != tc.err.Error() {
t.Fatalf("invalid panic message. got=%q, want=%q", err, tc.err)
}
default:
t.Fatalf("invalid type for panic message: %T (err=%v)", err, err)
}
}()
}
rec := array.NewRecord(tc.schema, tc.cols, tc.rows)
defer rec.Release()
if got, want := rec.NumRows(), tc.rows; got != want {
t.Fatalf("invalid number of rows: got=%d, want=%d", got, want)
}
})
}
}
func TestRecordReader(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)
schema := arrow.NewSchema(
[]arrow.Field{
arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64},
},
nil,
)
rec1 := func() array.Record {
col1 := func() array.Interface {
ib := array.NewInt32Builder(mem)
defer ib.Release()
ib.AppendValues([]int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
return ib.NewInt32Array()
}()
defer col1.Release()
col2 := func() array.Interface {
b := array.NewFloat64Builder(mem)
defer b.Release()
b.AppendValues([]float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
return b.NewFloat64Array()
}()
defer col2.Release()
cols := []array.Interface{col1, col2}
return array.NewRecord(schema, cols, -1)
}()
defer rec1.Release()
rec2 := func() array.Record {
col1 := func() array.Interface {
ib := array.NewInt32Builder(mem)
defer ib.Release()
ib.AppendValues([]int32{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, nil)
return ib.NewInt32Array()
}()
defer col1.Release()
col2 := func() array.Interface {
b := array.NewFloat64Builder(mem)
defer b.Release()
b.AppendValues([]float64{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, nil)
return b.NewFloat64Array()
}()
defer col2.Release()
cols := []array.Interface{col1, col2}
return array.NewRecord(schema, cols, -1)
}()
defer rec2.Release()
recs := []array.Record{rec1, rec2}
itr, err := array.NewRecordReader(schema, recs)
if err != nil {
t.Fatal(err)
}
defer itr.Release()
itr.Retain()
itr.Release()
if got, want := itr.Schema(), schema; !got.Equal(want) {
t.Fatalf("invalid schema. got=%#v, want=%#v", got, want)
}
n := 0
for itr.Next() {
n++
if got, want := itr.Record(), recs[n-1]; !reflect.DeepEqual(got, want) {
t.Fatalf("itr[%d], invalid record. got=%#v, want=%#v", n-1, got, want)
}
}
if n != len(recs) {
t.Fatalf("invalid number of iterations. got=%d, want=%d", n, len(recs))
}
for _, tc := range []struct {
name string
schema *arrow.Schema
err error
}{
{
name: "mismatch-name",
schema: arrow.NewSchema(
[]arrow.Field{
arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
arrow.Field{Name: "f2-XXX", Type: arrow.PrimitiveTypes.Float64},
},
nil,
),
err: fmt.Errorf("arrow/array: mismatch schema"),
},
{
name: "mismatch-type",
schema: arrow.NewSchema(
[]arrow.Field{
arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Int64},
},
nil,
),
err: fmt.Errorf("arrow/array: mismatch schema"),
},
} {
t.Run(tc.name, func(t *testing.T) {
itr, err := array.NewRecordReader(tc.schema, recs)
if itr != nil {
itr.Release()
}
if err == nil {
t.Fatalf("expected an error: %v", tc.err)
}
if !reflect.DeepEqual(tc.err, err) {
t.Fatalf("invalid error: got=%v, want=%v", err, tc.err)
}
})
}
}
func TestRecordBuilder(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)
schema := arrow.NewSchema(
[]arrow.Field{
arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64},
},
nil,
)
b := array.NewRecordBuilder(mem, schema)
defer b.Release()
b.Retain()
b.Release()
b.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3, 4, 5, 6}, nil)
b.Field(0).(*array.Int32Builder).AppendValues([]int32{7, 8, 9, 10}, nil)
b.Field(1).(*array.Float64Builder).AppendValues([]float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
rec := b.NewRecord()
defer rec.Release()
if got, want := rec.Schema(), schema; !got.Equal(want) {
t.Fatalf("invalid schema: got=%#v, want=%#v", got, want)
}
if got, want := rec.NumRows(), int64(10); got != want {
t.Fatalf("invalid number of rows: got=%d, want=%d", got, want)
}
if got, want := rec.NumCols(), int64(2); got != want {
t.Fatalf("invalid number of columns: got=%d, want=%d", got, want)
}
if got, want := rec.ColumnName(0), schema.Field(0).Name; got != want {
t.Fatalf("invalid column name: got=%q, want=%q", got, want)
}
}
type testMessage struct {
Foo *testMessageFoo
Bars []*testMessageBar
}
func (m *testMessage) Reset() { *m = testMessage{} }
func (m *testMessage) GetFoo() *testMessageFoo {
if m != nil {
return m.Foo
}
return nil
}
func (m *testMessage) GetBars() []*testMessageBar {
if m != nil {
return m.Bars
}
return nil
}
type testMessageFoo struct {
A int32
B []uint32
}
func (m *testMessageFoo) Reset() { *m = testMessageFoo{} }
func (m *testMessageFoo) GetA() int32 {
if m != nil {
return m.A
}
return 0
}
func (m *testMessageFoo) GetB() []uint32 {
if m != nil {
return m.B
}
return nil
}
type testMessageBar struct {
C int64
D []uint64
}
func (m *testMessageBar) Reset() { *m = testMessageBar{} }
func (m *testMessageBar) GetC() int64 {
if m != nil {
return m.C
}
return 0
}
func (m *testMessageBar) GetD() []uint64 {
if m != nil {
return m.D
}
return nil
}
var testMessageSchema = arrow.NewSchema(
[]arrow.Field{
arrow.Field{Name: "foo", Type: arrow.StructOf(
arrow.Field{Name: "a", Type: arrow.PrimitiveTypes.Int32},
arrow.Field{Name: "b", Type: arrow.ListOf(
arrow.PrimitiveTypes.Uint32,
)},
)},
arrow.Field{Name: "bars", Type: arrow.ListOf(
arrow.StructOf(
arrow.Field{Name: "c", Type: arrow.PrimitiveTypes.Int64},
arrow.Field{Name: "d", Type: arrow.ListOf(
arrow.PrimitiveTypes.Uint64,
)},
),
)},
},
nil,
)
func (m *testMessage) Fill(rec array.Record, row int) error {
m.Reset()
// foo
if 0 < rec.NumCols() {
src0 := rec.Column(0).Data()
typedSrc0 := array.NewStructData(src0)
defer typedSrc0.Release()
if typedSrc0.IsValid(row) {
m0 := &testMessageFoo{}
{
// a
if 0 < typedSrc0.NumField() {
src0_0 := typedSrc0.Field(0).Data()
typedSrc0_0 := array.NewInt32Data(src0_0)
defer typedSrc0_0.Release()
m0.A = typedSrc0_0.Value(row)
}
// b
if 1 < typedSrc0.NumField() {
src0_1 := typedSrc0.Field(1).Data()
listSrc0_1 := array.NewListData(src0_1)
defer listSrc0_1.Release()
if listSrc0_1.IsValid(row) {
typedSrc0_1 := array.NewUint32Data(listSrc0_1.ListValues().Data())
typedSrc0_1.Release()
start0_1 := int(listSrc0_1.Offsets()[row])
end0_1 := int(listSrc0_1.Offsets()[row+1])
for row := start0_1; row < end0_1; row++ {
m0.B = append(m0.B, typedSrc0_1.Value(row))
}
}
}
}
m.Foo = m0
}
}
// bars
if 1 < rec.NumCols() {
src1 := rec.Column(1).Data()
listSrc1 := array.NewListData(src1)
defer listSrc1.Release()
if listSrc1.IsValid(row) {
typedSrc1 := array.NewStructData(listSrc1.ListValues().Data())
defer typedSrc1.Release()
start1 := int(listSrc1.Offsets()[row])
end1 := int(listSrc1.Offsets()[row+1])
for row := start1; row < end1; row++ {
if typedSrc1.IsValid(row) {
m1 := &testMessageBar{}
{
// c
if 0 < typedSrc1.NumField() {
src1_0 := typedSrc1.Field(0).Data()
typedSrc1_0 := array.NewInt64Data(src1_0)
defer typedSrc1_0.Release()
m1.C = typedSrc1_0.Value(row)
}
// d
if 1 < typedSrc1.NumField() {
src1_1 := typedSrc1.Field(1).Data()
listSrc1_1 := array.NewListData(src1_1)
defer listSrc1_1.Release()
if listSrc1_1.IsValid(row) {
typedSrc1_1 := array.NewUint64Data(listSrc1_1.ListValues().Data())
defer typedSrc1_1.Release()
start1_1 := int(listSrc1_1.Offsets()[row])
end1_1 := int(listSrc1_1.Offsets()[row+1])
for row := start1_1; row < end1_1; row++ {
m1.D = append(m1.D, typedSrc1_1.Value(row))
}
}
}
}
m.Bars = append(m.Bars, m1)
} else {
m.Bars = append(m.Bars, nil)
}
}
}
}
return nil
}
func newTestMessageArrowRecordBuilder(mem memory.Allocator) *testMessageArrowRecordBuilder {
return &testMessageArrowRecordBuilder{
rb: array.NewRecordBuilder(mem, testMessageSchema),
}
}
type testMessageArrowRecordBuilder struct {
rb *array.RecordBuilder
}
func (b *testMessageArrowRecordBuilder) Build() array.Record {
return b.rb.NewRecord()
}
func (b *testMessageArrowRecordBuilder) Release() {
b.rb.Release()
}
func (b *testMessageArrowRecordBuilder) Append(m *testMessage) {
// foo
{
builder0 := b.rb.Field(0)
v0 := m.GetFoo()
valueBuilder0 := builder0.(*array.StructBuilder)
if v0 == nil {
valueBuilder0.AppendNull()
} else {
valueBuilder0.Append(true)
// a
{
v0_0 := v0.GetA()
builder0_0 := valueBuilder0.FieldBuilder(0)
valueBuilder0_0 := builder0_0.(*array.Int32Builder)
valueBuilder0_0.Append(v0_0)
}
// b
{
v0_1 := v0.GetB()
builder0_1 := valueBuilder0.FieldBuilder(1)
listBuilder0_1 := builder0_1.(*array.ListBuilder)
if len(v0_1) == 0 {
listBuilder0_1.AppendNull()
} else {
listBuilder0_1.Append(true)
valueBuilder0_1 := listBuilder0_1.ValueBuilder().(*array.Uint32Builder)
for _, item := range v0_1 {
valueBuilder0_1.Append(item)
}
}
}
}
}
// bars
{
builder1 := b.rb.Field(1)
v1 := m.GetBars()
listBuilder1 := builder1.(*array.ListBuilder)
if len(v1) == 0 {
listBuilder1.AppendNull()
} else {
listBuilder1.Append(true)
valueBuilder1 := listBuilder1.ValueBuilder().(*array.StructBuilder)
for _, item := range v1 {
if item == nil {
valueBuilder1.AppendNull()
} else {
valueBuilder1.Append(true)
// c
{
v1_0 := item.GetC()
builder1_0 := valueBuilder1.FieldBuilder(0)
valueBuilder1_0 := builder1_0.(*array.Int64Builder)
valueBuilder1_0.Append(v1_0)
}
// d
{
v1_1 := item.GetD()
builder1_1 := valueBuilder1.FieldBuilder(1)
listBuilder1_1 := builder1_1.(*array.ListBuilder)
if len(v1_1) == 0 {
listBuilder1_1.AppendNull()
} else {
listBuilder1_1.Append(true)
valueBuilder1_1 := listBuilder1_1.ValueBuilder().(*array.Uint64Builder)
for _, item := range v1_1 {
valueBuilder1_1.Append(item)
}
}
}
}
}
}
}
}
func TestRecordBuilderMessages(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)
b := newTestMessageArrowRecordBuilder(mem)
defer b.Release()
var msgs []*testMessage
for i := 0; i < 1000; i++ {
msg := &testMessage{
Foo: &testMessageFoo{
A: int32(i),
B: []uint32{2, 3, 4, 5, 6, 7, 8, 9},
},
Bars: []*testMessageBar{
&testMessageBar{
C: 11,
D: []uint64{12, 13, 14},
},
&testMessageBar{
C: 15,
D: []uint64{16, 17, 18, 19},
},
nil,
&testMessageBar{
C: 20,
D: []uint64{21},
},
},
}
msgs = append(msgs, msg)
b.Append(msg)
}
rec := b.Build()
defer rec.Release()
var got testMessage
for i := 0; i < 1000; i++ {
got.Fill(rec, i)
if !reflect.DeepEqual(&got, msgs[i]) {
t.Fatalf("row[%d], invalid record. got=%#v, want=%#v", i, &got, msgs[i])
}
}
}