blob: 8f498076e9ed1ed9ddc8b05ef38b6b20aece6637 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ipc
import (
"bytes"
"errors"
"io"
"testing"
"github.com/apache/arrow/go/v9/arrow"
"github.com/apache/arrow/go/v9/arrow/array"
"github.com/apache/arrow/go/v9/arrow/memory"
)
func TestMessageReaderBodyInAllocator(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)
const numRecords = 3
buf := writeRecordsIntoBuffer(t, numRecords)
r := NewMessageReader(buf, WithAllocator(mem))
defer r.Release()
msgs := make([]*Message, 0)
for {
m, err := r.Message()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
t.Fatal(err)
}
m.Retain()
msgs = append(msgs, m)
}
if len(msgs) != numRecords+1 {
t.Fatalf("expected %d messages but got %d", numRecords+1, len(msgs))
}
if mem.CurrentAlloc() <= 0 {
t.Fatal("message bodies should have been allocated")
}
for _, m := range msgs {
m.Release()
}
}
func writeRecordsIntoBuffer(t *testing.T, numRecords int) *bytes.Buffer {
mem := memory.NewCheckedAllocator(memory.NewGoAllocator())
defer mem.AssertSize(t, 0)
s, recs := getTestRecords(mem, numRecords)
buf := new(bytes.Buffer)
w := NewWriter(buf, WithAllocator(mem), WithSchema(s))
for _, rec := range recs {
err := w.Write(rec)
rec.Release()
if err != nil {
t.Fatal(err)
}
}
if err := w.Close(); err != nil {
t.Fatal(err)
}
return buf
}
func getTestRecords(mem memory.Allocator, numRecords int) (*arrow.Schema, []arrow.Record) {
meta := arrow.NewMetadata([]string{}, []string{})
s := arrow.NewSchema([]arrow.Field{
{Name: "test-col", Type: arrow.PrimitiveTypes.Int64},
}, &meta)
builder := array.NewRecordBuilder(mem, s)
defer builder.Release()
recs := make([]arrow.Record, numRecords)
for i := 0; i < len(recs); i++ {
col := builder.Field(0).(*array.Int64Builder)
for i := 0; i < 10; i++ {
col.Append(int64(i))
}
recs[i] = builder.NewRecord()
}
return s, recs
}