blob: 8df5a6a6fc6676c1ddf0f2cee43b978df3e4370c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package array_test
import (
func TestRecord(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)
schema := arrow.NewSchema(
arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64},
col1 := func() array.Interface {
ib := array.NewInt32Builder(mem)
defer ib.Release()
ib.AppendValues([]int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
return ib.NewInt32Array()
defer col1.Release()
col2 := func() array.Interface {
b := array.NewFloat64Builder(mem)
defer b.Release()
b.AppendValues([]float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
return b.NewFloat64Array()
defer col2.Release()
cols := []array.Interface{col1, col2}
rec := array.NewRecord(schema, cols, -1)
defer rec.Release()
if got, want := rec.Schema(), schema; !got.Equal(want) {
t.Fatalf("invalid schema: got=%#v, want=%#v", got, want)
if got, want := rec.NumRows(), int64(10); got != want {
t.Fatalf("invalid number of rows: got=%d, want=%d", got, want)
if got, want := rec.NumCols(), int64(2); got != want {
t.Fatalf("invalid number of columns: got=%d, want=%d", got, want)
if got, want := rec.Columns()[0], cols[0]; got != want {
t.Fatalf("invalid column: got=%q, want=%q", got, want)
if got, want := rec.Column(0), cols[0]; got != want {
t.Fatalf("invalid column: got=%q, want=%q", got, want)
if got, want := rec.ColumnName(0), schema.Field(0).Name; got != want {
t.Fatalf("invalid column name: got=%q, want=%q", got, want)
for _, tc := range []struct {
i, j int64
err error
{i: 0, j: 10, err: nil},
{i: 1, j: 10, err: nil},
{i: 1, j: 9, err: nil},
{i: 0, j: 0, err: nil},
{i: 1, j: 1, err: nil},
{i: 10, j: 10, err: nil},
{i: 1, j: 0, err: fmt.Errorf("arrow/array: index out of range")},
{i: 1, j: 11, err: fmt.Errorf("arrow/array: index out of range")},
} {
t.Run(fmt.Sprintf("slice-%02d-%02d", tc.i, tc.j), func(t *testing.T) {
if tc.err != nil {
defer func() {
e := recover()
if e == nil {
t.Fatalf("expected an error %q", tc.err)
switch err := e.(type) {
case string:
if err != tc.err.Error() {
t.Fatalf("invalid panic message. got=%q, want=%q", err, tc.err)
case error:
if err.Error() != tc.err.Error() {
t.Fatalf("invalid panic message. got=%q, want=%q", err, tc.err)
t.Fatalf("invalid type for panic message: %T (err=%v)", err, err)
sub := rec.NewSlice(tc.i, tc.j)
defer sub.Release()
if got, want := sub.NumRows(), tc.j-tc.i; got != want {
t.Fatalf("invalid rec-slice number of rows: got=%d, want=%d", got, want)
for _, tc := range []struct {
schema *arrow.Schema
cols []array.Interface
rows int64
err error
schema: schema,
cols: nil,
rows: -1,
err: fmt.Errorf("arrow/array: number of columns/fields mismatch"),
schema: schema,
cols: cols[:1],
rows: 0,
err: fmt.Errorf("arrow/array: number of columns/fields mismatch"),
schema: arrow.NewSchema(
arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
cols: cols,
rows: 0,
err: fmt.Errorf("arrow/array: number of columns/fields mismatch"),
schema: arrow.NewSchema(
arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Int32},
cols: cols,
rows: 0,
err: fmt.Errorf(`arrow/array: column "f2-f64" type mismatch: got=float64, want=int32`),
schema: schema,
cols: cols,
rows: 11,
err: fmt.Errorf(`arrow/array: mismatch number of rows in column "f1-i32": got=10, want=11`),
schema: schema,
cols: cols,
rows: 10,
err: nil,
schema: schema,
cols: cols,
rows: 3,
err: nil,
schema: schema,
cols: cols,
rows: 0,
err: nil,
} {
t.Run("", func(t *testing.T) {
if tc.err != nil {
defer func() {
e := recover()
if e == nil {
t.Fatalf("expected an error %q", tc.err)
switch err := e.(type) {
case string:
if err != tc.err.Error() {
t.Fatalf("invalid panic message. got=%q, want=%q", err, tc.err)
case error:
if err.Error() != tc.err.Error() {
t.Fatalf("invalid panic message. got=%q, want=%q", err, tc.err)
t.Fatalf("invalid type for panic message: %T (err=%v)", err, err)
rec := array.NewRecord(tc.schema, tc.cols, tc.rows)
defer rec.Release()
if got, want := rec.NumRows(), tc.rows; got != want {
t.Fatalf("invalid number of rows: got=%d, want=%d", got, want)
func TestRecordReader(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)
schema := arrow.NewSchema(
arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64},
rec1 := func() array.Record {
col1 := func() array.Interface {
ib := array.NewInt32Builder(mem)
defer ib.Release()
ib.AppendValues([]int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
return ib.NewInt32Array()
defer col1.Release()
col2 := func() array.Interface {
b := array.NewFloat64Builder(mem)
defer b.Release()
b.AppendValues([]float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
return b.NewFloat64Array()
defer col2.Release()
cols := []array.Interface{col1, col2}
return array.NewRecord(schema, cols, -1)
defer rec1.Release()
rec2 := func() array.Record {
col1 := func() array.Interface {
ib := array.NewInt32Builder(mem)
defer ib.Release()
ib.AppendValues([]int32{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, nil)
return ib.NewInt32Array()
defer col1.Release()
col2 := func() array.Interface {
b := array.NewFloat64Builder(mem)
defer b.Release()
b.AppendValues([]float64{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, nil)
return b.NewFloat64Array()
defer col2.Release()
cols := []array.Interface{col1, col2}
return array.NewRecord(schema, cols, -1)
defer rec2.Release()
recs := []array.Record{rec1, rec2}
itr, err := array.NewRecordReader(schema, recs)
if err != nil {
defer itr.Release()
if got, want := itr.Schema(), schema; !got.Equal(want) {
t.Fatalf("invalid schema. got=%#v, want=%#v", got, want)
n := 0
for itr.Next() {
if got, want := itr.Record(), recs[n-1]; !reflect.DeepEqual(got, want) {
t.Fatalf("itr[%d], invalid record. got=%#v, want=%#v", n-1, got, want)
if n != len(recs) {
t.Fatalf("invalid number of iterations. got=%d, want=%d", n, len(recs))
for _, tc := range []struct {
name string
schema *arrow.Schema
err error
name: "mismatch-name",
schema: arrow.NewSchema(
arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
arrow.Field{Name: "f2-XXX", Type: arrow.PrimitiveTypes.Float64},
err: fmt.Errorf("arrow/array: mismatch schema"),
name: "mismatch-type",
schema: arrow.NewSchema(
arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Int64},
err: fmt.Errorf("arrow/array: mismatch schema"),
} {
t.Run(, func(t *testing.T) {
itr, err := array.NewRecordReader(tc.schema, recs)
if itr != nil {
if err == nil {
t.Fatalf("expected an error: %v", tc.err)
if !reflect.DeepEqual(tc.err, err) {
t.Fatalf("invalid error: got=%v, want=%v", err, tc.err)
func TestRecordBuilder(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)
schema := arrow.NewSchema(
arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32},
arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64},
b := array.NewRecordBuilder(mem, schema)
defer b.Release()
b.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3, 4, 5, 6}, nil)
b.Field(0).(*array.Int32Builder).AppendValues([]int32{7, 8, 9, 10}, nil)
b.Field(1).(*array.Float64Builder).AppendValues([]float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil)
rec := b.NewRecord()
defer rec.Release()
if got, want := rec.Schema(), schema; !got.Equal(want) {
t.Fatalf("invalid schema: got=%#v, want=%#v", got, want)
if got, want := rec.NumRows(), int64(10); got != want {
t.Fatalf("invalid number of rows: got=%d, want=%d", got, want)
if got, want := rec.NumCols(), int64(2); got != want {
t.Fatalf("invalid number of columns: got=%d, want=%d", got, want)
if got, want := rec.ColumnName(0), schema.Field(0).Name; got != want {
t.Fatalf("invalid column name: got=%q, want=%q", got, want)
type testMessage struct {
Foo *testMessageFoo
Bars []*testMessageBar
func (m *testMessage) Reset() { *m = testMessage{} }
func (m *testMessage) GetFoo() *testMessageFoo {
if m != nil {
return m.Foo
return nil
func (m *testMessage) GetBars() []*testMessageBar {
if m != nil {
return m.Bars
return nil
type testMessageFoo struct {
A int32
B []uint32
func (m *testMessageFoo) Reset() { *m = testMessageFoo{} }
func (m *testMessageFoo) GetA() int32 {
if m != nil {
return m.A
return 0
func (m *testMessageFoo) GetB() []uint32 {
if m != nil {
return m.B
return nil
type testMessageBar struct {
C int64
D []uint64
func (m *testMessageBar) Reset() { *m = testMessageBar{} }
func (m *testMessageBar) GetC() int64 {
if m != nil {
return m.C
return 0
func (m *testMessageBar) GetD() []uint64 {
if m != nil {
return m.D
return nil
var testMessageSchema = arrow.NewSchema(
arrow.Field{Name: "foo", Type: arrow.StructOf(
arrow.Field{Name: "a", Type: arrow.PrimitiveTypes.Int32},
arrow.Field{Name: "b", Type: arrow.ListOf(
arrow.Field{Name: "bars", Type: arrow.ListOf(
arrow.Field{Name: "c", Type: arrow.PrimitiveTypes.Int64},
arrow.Field{Name: "d", Type: arrow.ListOf(
func (m *testMessage) Fill(rec array.Record, row int) error {
// foo
if 0 < rec.NumCols() {
src0 := rec.Column(0).Data()
typedSrc0 := array.NewStructData(src0)
defer typedSrc0.Release()
if typedSrc0.IsValid(row) {
m0 := &testMessageFoo{}
// a
if 0 < typedSrc0.NumField() {
src0_0 := typedSrc0.Field(0).Data()
typedSrc0_0 := array.NewInt32Data(src0_0)
defer typedSrc0_0.Release()
m0.A = typedSrc0_0.Value(row)
// b
if 1 < typedSrc0.NumField() {
src0_1 := typedSrc0.Field(1).Data()
listSrc0_1 := array.NewListData(src0_1)
defer listSrc0_1.Release()
if listSrc0_1.IsValid(row) {
typedSrc0_1 := array.NewUint32Data(listSrc0_1.ListValues().Data())
start0_1 := int(listSrc0_1.Offsets()[row])
end0_1 := int(listSrc0_1.Offsets()[row+1])
for row := start0_1; row < end0_1; row++ {
m0.B = append(m0.B, typedSrc0_1.Value(row))
m.Foo = m0
// bars
if 1 < rec.NumCols() {
src1 := rec.Column(1).Data()
listSrc1 := array.NewListData(src1)
defer listSrc1.Release()
if listSrc1.IsValid(row) {
typedSrc1 := array.NewStructData(listSrc1.ListValues().Data())
defer typedSrc1.Release()
start1 := int(listSrc1.Offsets()[row])
end1 := int(listSrc1.Offsets()[row+1])
for row := start1; row < end1; row++ {
if typedSrc1.IsValid(row) {
m1 := &testMessageBar{}
// c
if 0 < typedSrc1.NumField() {
src1_0 := typedSrc1.Field(0).Data()
typedSrc1_0 := array.NewInt64Data(src1_0)
defer typedSrc1_0.Release()
m1.C = typedSrc1_0.Value(row)
// d
if 1 < typedSrc1.NumField() {
src1_1 := typedSrc1.Field(1).Data()
listSrc1_1 := array.NewListData(src1_1)
defer listSrc1_1.Release()
if listSrc1_1.IsValid(row) {
typedSrc1_1 := array.NewUint64Data(listSrc1_1.ListValues().Data())
defer typedSrc1_1.Release()
start1_1 := int(listSrc1_1.Offsets()[row])
end1_1 := int(listSrc1_1.Offsets()[row+1])
for row := start1_1; row < end1_1; row++ {
m1.D = append(m1.D, typedSrc1_1.Value(row))
m.Bars = append(m.Bars, m1)
} else {
m.Bars = append(m.Bars, nil)
return nil
func newTestMessageArrowRecordBuilder(mem memory.Allocator) *testMessageArrowRecordBuilder {
return &testMessageArrowRecordBuilder{
rb: array.NewRecordBuilder(mem, testMessageSchema),
type testMessageArrowRecordBuilder struct {
rb *array.RecordBuilder
func (b *testMessageArrowRecordBuilder) Build() array.Record {
return b.rb.NewRecord()
func (b *testMessageArrowRecordBuilder) Release() {
func (b *testMessageArrowRecordBuilder) Append(m *testMessage) {
// foo
builder0 := b.rb.Field(0)
v0 := m.GetFoo()
valueBuilder0 := builder0.(*array.StructBuilder)
if v0 == nil {
} else {
// a
v0_0 := v0.GetA()
builder0_0 := valueBuilder0.FieldBuilder(0)
valueBuilder0_0 := builder0_0.(*array.Int32Builder)
// b
v0_1 := v0.GetB()
builder0_1 := valueBuilder0.FieldBuilder(1)
listBuilder0_1 := builder0_1.(*array.ListBuilder)
if len(v0_1) == 0 {
} else {
valueBuilder0_1 := listBuilder0_1.ValueBuilder().(*array.Uint32Builder)
for _, item := range v0_1 {
// bars
builder1 := b.rb.Field(1)
v1 := m.GetBars()
listBuilder1 := builder1.(*array.ListBuilder)
if len(v1) == 0 {
} else {
valueBuilder1 := listBuilder1.ValueBuilder().(*array.StructBuilder)
for _, item := range v1 {
if item == nil {
} else {
// c
v1_0 := item.GetC()
builder1_0 := valueBuilder1.FieldBuilder(0)
valueBuilder1_0 := builder1_0.(*array.Int64Builder)
// d
v1_1 := item.GetD()
builder1_1 := valueBuilder1.FieldBuilder(1)
listBuilder1_1 := builder1_1.(*array.ListBuilder)
if len(v1_1) == 0 {
} else {
valueBuilder1_1 := listBuilder1_1.ValueBuilder().(*array.Uint64Builder)
for _, item := range v1_1 {
func TestRecordBuilderMessages(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)
b := newTestMessageArrowRecordBuilder(mem)
defer b.Release()
var msgs []*testMessage
for i := 0; i < 1000; i++ {
msg := &testMessage{
Foo: &testMessageFoo{
A: int32(i),
B: []uint32{2, 3, 4, 5, 6, 7, 8, 9},
Bars: []*testMessageBar{
C: 11,
D: []uint64{12, 13, 14},
C: 15,
D: []uint64{16, 17, 18, 19},
C: 20,
D: []uint64{21},
msgs = append(msgs, msg)
rec := b.Build()
defer rec.Release()
var got testMessage
for i := 0; i < 1000; i++ {
got.Fill(rec, i)
if !reflect.DeepEqual(&got, msgs[i]) {
t.Fatalf("row[%d], invalid record. got=%#v, want=%#v", i, &got, msgs[i])