| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package ipc_test |
| |
| import ( |
| "bytes" |
| "errors" |
| "fmt" |
| "io" |
| "math/rand" |
| "strconv" |
| "strings" |
| "testing" |
| |
| "github.com/stretchr/testify/assert" |
| "github.com/stretchr/testify/require" |
| |
| "github.com/apache/arrow/go/v17/arrow" |
| "github.com/apache/arrow/go/v17/arrow/array" |
| "github.com/apache/arrow/go/v17/arrow/ipc" |
| "github.com/apache/arrow/go/v17/arrow/memory" |
| ) |
| |
| func TestArrow12072(t *testing.T) { |
| schema := arrow.NewSchema( |
| []arrow.Field{ |
| {Name: "idx", Type: arrow.PrimitiveTypes.Int64}, |
| {Name: "A", Type: arrow.PrimitiveTypes.Int64}, |
| {Name: "B", Type: arrow.PrimitiveTypes.Int64}, |
| {Name: "C", Type: arrow.BinaryTypes.String}, |
| }, |
| nil, // no metadata |
| ) |
| mem := memory.NewGoAllocator() |
| counter := int64(0) |
| |
| b := array.NewRecordBuilder(mem, schema) |
| defer b.Release() |
| |
| const size = 3 |
| for i := 0; i < size; i++ { |
| b.Field(0).(*array.Int64Builder).AppendValues([]int64{counter}, nil) |
| counter++ |
| b.Field(1).(*array.Int64Builder).AppendValues( |
| []int64{int64(rand.Intn(100))}, nil) |
| b.Field(2).(*array.Int64Builder).AppendValues( |
| []int64{int64(rand.Intn(100))}, nil) |
| b.Field(3).(*array.StringBuilder).AppendValues( |
| []string{strconv.Itoa(rand.Intn(100))}, nil) |
| } |
| |
| rec := b.NewRecord() |
| defer rec.Release() |
| |
| tbl := array.NewTableFromRecords(schema, []arrow.Record{rec}) |
| defer tbl.Release() |
| |
| tr := array.NewTableReader(tbl, 1) |
| defer tr.Release() |
| |
| data := []arrow.Record{} |
| for tr.Next() { |
| rec := tr.Record() |
| rec.Retain() |
| defer rec.Release() |
| data = append(data, rec) |
| } |
| |
| // tests writing out and then reading back in slices of the same record of length 1 each |
| // testing the bug that was reported in ARROW-12072 involving offsets for string arrays |
| // and correct truncation of slices when writing ipc FixedWidthDataType |
| for _, rec := range data { |
| var buf []byte |
| assert.NotPanics(t, func() { |
| var output bytes.Buffer |
| w := ipc.NewWriter(&output, ipc.WithSchema(rec.Schema())) |
| assert.NoError(t, w.Write(rec)) |
| assert.NoError(t, w.Close()) |
| buf = output.Bytes() |
| }) |
| |
| assert.NotPanics(t, func() { |
| rdr, err := ipc.NewReader(bytes.NewReader(buf)) |
| assert.NoError(t, err) |
| for rdr.Next() { |
| out := rdr.Record() |
| assert.Truef(t, array.RecordEqual(rec, out), "expected: %s\ngot: %s\n", rec, out) |
| } |
| assert.NoError(t, rdr.Err()) |
| }) |
| } |
| } |
| |
| type testMessageReader struct { |
| counter int |
| } |
| |
| func (r *testMessageReader) Message() (*ipc.Message, error) { |
| if r.counter == 0 { |
| r.counter++ |
| // return schema message |
| schema := arrow.NewSchema([]arrow.Field{ |
| {Name: "f1", Type: arrow.PrimitiveTypes.Int32}, |
| }, nil) |
| var buf bytes.Buffer |
| writer := ipc.NewWriter(&buf, ipc.WithSchema(schema)) |
| if err := writer.Close(); err != nil { |
| return nil, err |
| } |
| reader := ipc.NewMessageReader(&buf) |
| return reader.Message() |
| } |
| // return error |
| return nil, errors.New("Error!") |
| } |
| func (r *testMessageReader) Release() {} |
| func (r *testMessageReader) Retain() {} |
| |
| // Ensure that if the MessageReader errors, we get the error from Read |
| func TestArrow14769(t *testing.T) { |
| reader, err := ipc.NewReaderFromMessageReader(&testMessageReader{}) |
| if err != nil { |
| t.Fatal(err) |
| } |
| _, err = reader.Read() |
| if err == nil || errors.Is(err, io.EOF) { |
| t.Fatalf("Expected an error, got %s", err) |
| } |
| if err.Error() != "Error!" { |
| t.Fatalf("Expected an error, not %s", err) |
| } |
| } |
| |
| func makeTestCol(t *testing.T, alloc memory.Allocator, vals []int32, nulls []bool) (arrow.Field, *arrow.Column) { |
| t.Helper() |
| fld := arrow.Field{Name: "test", Type: arrow.PrimitiveTypes.Int32, Nullable: nulls != nil} |
| |
| b := array.NewInt32Builder(alloc) |
| defer b.Release() |
| b.AppendValues(vals, nulls) |
| |
| arr := b.NewArray() |
| defer arr.Release() |
| |
| chk := arrow.NewChunked(arrow.PrimitiveTypes.Int32, []arrow.Array{arr}) |
| defer chk.Release() |
| |
| return fld, arrow.NewColumn(fld, chk) |
| } |
| |
| func makeTestTable(t *testing.T, fld arrow.Field, col *arrow.Column) arrow.Table { |
| t.Helper() |
| schema := arrow.NewSchema([]arrow.Field{fld}, nil) |
| return array.NewTable(schema, []arrow.Column{*col}, -1) |
| } |
| |
| func writeThenReadTable(t *testing.T, alloc memory.Allocator, table arrow.Table) arrow.Table { |
| t.Helper() |
| |
| // write the table into a buffer |
| buf := new(bytes.Buffer) |
| writer := ipc.NewWriter(buf, ipc.WithAllocator(alloc), ipc.WithSchema(table.Schema())) |
| tr := array.NewTableReader(table, 0) |
| defer tr.Release() |
| for tr.Next() { |
| require.NoError(t, writer.Write(tr.Record())) |
| } |
| require.NoError(t, writer.Close()) |
| |
| // read the table from the buffer |
| reader, err := ipc.NewReader(buf, ipc.WithAllocator(alloc)) |
| require.NoError(t, err) |
| defer reader.Release() |
| records := make([]arrow.Record, 0) |
| for reader.Next() { |
| rec := reader.Record() |
| rec.Retain() |
| defer rec.Release() |
| records = append(records, rec) |
| } |
| require.NoError(t, reader.Err()) |
| return array.NewTableFromRecords(reader.Schema(), records) |
| } |
| |
| func TestWriteColumnWithOffset(t *testing.T) { |
| alloc := memory.NewCheckedAllocator(memory.DefaultAllocator) |
| defer alloc.AssertSize(t, 0) |
| |
| t.Run("some nulls", func(t *testing.T) { |
| vals := []int32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10} |
| nulls := []bool{true, false, true, false, true, false, true, false, true, false, true} |
| fld, col := makeTestCol(t, alloc, vals, nulls) |
| defer col.Release() |
| |
| // slice the column so there are offsets |
| col = array.NewColumnSlice(col, 3, 8) |
| defer col.Release() |
| |
| table := makeTestTable(t, fld, col) |
| defer table.Release() |
| |
| table = writeThenReadTable(t, alloc, table) |
| defer table.Release() |
| |
| require.EqualValues(t, 1, table.NumCols()) |
| col = table.Column(0) |
| colArr := col.Data().Chunk(0).(*array.Int32) |
| require.EqualValues(t, 5, colArr.Len()) |
| assert.True(t, colArr.IsNull(0)) |
| assert.False(t, colArr.IsNull(1)) |
| assert.True(t, colArr.IsNull(2)) |
| assert.False(t, colArr.IsNull(3)) |
| assert.True(t, colArr.IsNull(4)) |
| }) |
| |
| t.Run("all nulls", func(t *testing.T) { |
| vals := []int32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10} |
| nulls := []bool{false, false, false, false, false, false, false, false, false, false, false} |
| fld, col := makeTestCol(t, alloc, vals, nulls) |
| defer col.Release() |
| |
| // slice the column so there are offsets |
| col = array.NewColumnSlice(col, 3, 8) |
| defer col.Release() |
| |
| table := makeTestTable(t, fld, col) |
| defer table.Release() |
| |
| table = writeThenReadTable(t, alloc, table) |
| defer table.Release() |
| |
| require.EqualValues(t, 1, table.NumCols()) |
| col = table.Column(0) |
| colArr := col.Data().Chunk(0).(*array.Int32) |
| require.EqualValues(t, 5, colArr.Len()) |
| for i := 0; i < colArr.Len(); i++ { |
| assert.True(t, colArr.IsNull(i)) |
| } |
| }) |
| |
| t.Run("no nulls", func(t *testing.T) { |
| vals := []int32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10} |
| nulls := []bool{true, true, true, true, true, true, true, true, true, true, true} |
| fld, col := makeTestCol(t, alloc, vals, nulls) |
| defer col.Release() |
| |
| // slice the column so there are offsets |
| col = array.NewColumnSlice(col, 3, 8) |
| defer col.Release() |
| |
| table := makeTestTable(t, fld, col) |
| defer table.Release() |
| |
| table = writeThenReadTable(t, alloc, table) |
| defer table.Release() |
| |
| require.EqualValues(t, 1, table.NumCols()) |
| col = table.Column(0) |
| colArr := col.Data().Chunk(0).(*array.Int32) |
| require.EqualValues(t, 5, colArr.Len()) |
| for i := 0; i < colArr.Len(); i++ { |
| assert.False(t, colArr.IsNull(i)) |
| } |
| }) |
| } |
| |
| func TestIPCTable(t *testing.T) { |
| pool := memory.NewGoAllocator() |
| schema := arrow.NewSchema([]arrow.Field{{Name: "f1", Type: arrow.PrimitiveTypes.Int32}}, nil) |
| b := array.NewRecordBuilder(pool, schema) |
| defer b.Release() |
| b.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3, 4}, []bool{true, true, false, true}) |
| |
| rec1 := b.NewRecord() |
| defer rec1.Release() |
| |
| tbl := array.NewTableFromRecords(schema, []arrow.Record{rec1}) |
| defer tbl.Release() |
| |
| var buf bytes.Buffer |
| ipcWriter := ipc.NewWriter(&buf, ipc.WithAllocator(pool), ipc.WithSchema(schema)) |
| defer func(ipcWriter *ipc.Writer) { |
| err := ipcWriter.Close() |
| if err != nil { |
| t.Fatalf("error closing ipc writer: %s", err.Error()) |
| } |
| }(ipcWriter) |
| |
| t.Log("Reading data before") |
| tr := array.NewTableReader(tbl, 2) |
| defer tr.Release() |
| |
| n := 0 |
| for tr.Next() { |
| rec := tr.Record() |
| for i, col := range rec.Columns() { |
| t.Logf("rec[%d][%q]: %v nulls:%v\n", n, |
| rec.ColumnName(i), col, col.NullBitmapBytes()) |
| } |
| n++ |
| err := ipcWriter.Write(rec) |
| if err != nil { |
| panic(err) |
| } |
| } |
| |
| t.Log("Reading data after") |
| ipcReader, err := ipc.NewReader(bytes.NewReader(buf.Bytes()), ipc.WithAllocator(pool)) |
| if err != nil { |
| panic(err) |
| } |
| n = 0 |
| for ipcReader.Next() { |
| rec := ipcReader.Record() |
| for i, col := range rec.Columns() { |
| t.Logf("rec[%d][%q]: %v nulls:%v\n", n, |
| rec.ColumnName(i), col, col.NullBitmapBytes()) |
| } |
| n++ |
| } |
| } |
| |
| // ARROW-18317 |
| func TestDictionary(t *testing.T) { |
| pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) |
| defer pool.AssertSize(t, 0) |
| |
| // A schema with a single dictionary field |
| schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: &arrow.DictionaryType{ |
| IndexType: arrow.PrimitiveTypes.Uint16, |
| ValueType: arrow.BinaryTypes.String, |
| Ordered: false, |
| }}}, nil) |
| |
| // IPC writer and reader |
| var bufWriter bytes.Buffer |
| ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema), ipc.WithAllocator(pool), ipc.WithDictionaryDeltas(false)) |
| defer ipcWriter.Close() |
| |
| bufReader := bytes.NewReader([]byte{}) |
| var ipcReader *ipc.Reader |
| |
| bldr := array.NewBuilder(pool, schema.Field(0).Type) |
| defer bldr.Release() |
| require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_0"]`))) |
| |
| arr := bldr.NewArray() |
| defer arr.Release() |
| // Create a first record with field = "value_0" |
| record := array.NewRecord(schema, []arrow.Array{arr}, 1) |
| defer record.Release() |
| |
| expectedJson, err := record.MarshalJSON() |
| require.NoError(t, err) |
| // Serialize and deserialize the record via an IPC stream |
| json, ipcReader, err := encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader) |
| require.NoError(t, err) |
| // Compare the expected JSON with the actual JSON |
| require.JSONEq(t, string(expectedJson), string(json)) |
| |
| // Create a second record with field = "value_1" |
| require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_1"]`))) |
| arr = bldr.NewArray() |
| defer arr.Release() |
| record = array.NewRecord(schema, []arrow.Array{arr}, 1) |
| |
| // record, _, err = array.RecordFromJSON(pool, schema, strings.NewReader(`[{"field": ["value_1"]}]`)) |
| // require.NoError(t, err) |
| defer record.Release() |
| |
| expectedJson, err = record.MarshalJSON() |
| require.NoError(t, err) |
| // Serialize and deserialize the record via an IPC stream |
| json, ipcReader, err = encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader) |
| require.NoError(t, err) |
| // Compare the expected JSON with the actual JSON |
| // field = "value_0" but should be "value_1" |
| require.JSONEq(t, string(expectedJson), string(json)) |
| require.NoError(t, ipcReader.Err()) |
| ipcReader.Release() |
| } |
| |
| // ARROW-18326 |
| func TestDictionaryDeltas(t *testing.T) { |
| pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) |
| defer pool.AssertSize(t, 0) |
| |
| // A schema with a single dictionary field |
| schema := arrow.NewSchema([]arrow.Field{{Name: "field", Type: &arrow.DictionaryType{ |
| IndexType: arrow.PrimitiveTypes.Uint16, |
| ValueType: arrow.BinaryTypes.String, |
| Ordered: false, |
| }}}, nil) |
| |
| // IPC writer and reader |
| var bufWriter bytes.Buffer |
| ipcWriter := ipc.NewWriter(&bufWriter, ipc.WithSchema(schema), ipc.WithAllocator(pool), ipc.WithDictionaryDeltas(true)) |
| defer ipcWriter.Close() |
| |
| bufReader := bytes.NewReader([]byte{}) |
| var ipcReader *ipc.Reader |
| |
| bldr := array.NewBuilder(pool, schema.Field(0).Type) |
| defer bldr.Release() |
| require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_0"]`))) |
| |
| arr := bldr.NewArray() |
| defer arr.Release() |
| // Create a first record with field = "value_0" |
| record := array.NewRecord(schema, []arrow.Array{arr}, 1) |
| defer record.Release() |
| |
| expectedJson, err := record.MarshalJSON() |
| require.NoError(t, err) |
| // Serialize and deserialize the record via an IPC stream |
| json, ipcReader, err := encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader) |
| require.NoError(t, err) |
| // Compare the expected JSON with the actual JSON |
| require.JSONEq(t, string(expectedJson), string(json)) |
| |
| // Create a second record with field = "value_1" |
| require.NoError(t, bldr.UnmarshalJSON([]byte(`["value_1"]`))) |
| arr = bldr.NewArray() |
| defer arr.Release() |
| record = array.NewRecord(schema, []arrow.Array{arr}, 1) |
| defer record.Release() |
| |
| expectedJson, err = record.MarshalJSON() |
| require.NoError(t, err) |
| // Serialize and deserialize the record via an IPC stream |
| json, ipcReader, err = encodeDecodeIpcStream(t, record, &bufWriter, ipcWriter, bufReader, ipcReader) |
| require.NoError(t, err) |
| // Compare the expected JSON with the actual JSON |
| // field = "value_0" but should be "value_1" |
| require.JSONEq(t, string(expectedJson), string(json)) |
| require.NoError(t, ipcReader.Err()) |
| ipcReader.Release() |
| } |
| |
| // Encode and decode a record over a tuple of IPC writer and reader. |
| // IPC writer and reader are the same from one call to another. |
| func encodeDecodeIpcStream(t *testing.T, |
| record arrow.Record, |
| bufWriter *bytes.Buffer, ipcWriter *ipc.Writer, |
| bufReader *bytes.Reader, ipcReader *ipc.Reader) ([]byte, *ipc.Reader, error) { |
| |
| // Serialize the record via an ipc writer |
| if err := ipcWriter.Write(record); err != nil { |
| return nil, ipcReader, err |
| } |
| serializedRecord := bufWriter.Bytes() |
| bufWriter.Reset() |
| |
| // Deserialize the record via an ipc reader |
| bufReader.Reset(serializedRecord) |
| if ipcReader == nil { |
| newIpcReader, err := ipc.NewReader(bufReader) |
| if err != nil { |
| return nil, newIpcReader, err |
| } |
| ipcReader = newIpcReader |
| } |
| ipcReader.Next() |
| record = ipcReader.Record() |
| |
| // Return the decoded record as a json string |
| json, err := record.MarshalJSON() |
| if err != nil { |
| return nil, ipcReader, err |
| } |
| return json, ipcReader, nil |
| } |
| |
| func Example_mapSlice() { |
| mem := memory.DefaultAllocator |
| dt := arrow.MapOf(arrow.BinaryTypes.String, arrow.BinaryTypes.String) |
| schema := arrow.NewSchema([]arrow.Field{{ |
| Name: "map", |
| Type: dt, |
| }}, nil) |
| |
| arr, _, err := array.FromJSON(mem, dt, strings.NewReader(`[ |
| [{"key": "index1", "value": "main2"}], |
| [{"key": "index3", "value": "main4"}, {"key": "tag_int", "value": ""}], |
| [{"key":"index5","value":"main6"},{"key":"tag_int","value":""}], |
| [{"key":"index6","value":"main7"},{"key":"tag_int","value":""}], |
| [{"key":"index7","value":"main8"},{"key":"tag_int","value":""}], |
| [{"key":"index8","value":"main9"}] |
| ]`)) |
| if err != nil { |
| panic(err) |
| } |
| defer arr.Release() |
| |
| rec := array.NewRecord(schema, []arrow.Array{arr}, int64(arr.Len())) |
| defer rec.Release() |
| rec2 := rec.NewSlice(1, 2) |
| defer rec2.Release() |
| |
| var buf bytes.Buffer |
| w := ipc.NewWriter(&buf, ipc.WithSchema(rec.Schema())) |
| if err := w.Write(rec2); err != nil { |
| panic(err) |
| } |
| if err := w.Close(); err != nil { |
| panic(err) |
| } |
| |
| r, err := ipc.NewReader(&buf) |
| if err != nil { |
| panic(err) |
| } |
| defer r.Release() |
| |
| r.Next() |
| fmt.Println(r.Record()) |
| |
| // Output: |
| // record: |
| // schema: |
| // fields: 1 |
| // - map: type=map<utf8, utf8, items_nullable> |
| // rows: 1 |
| // col[0][map]: [{["index3" "tag_int"] ["main4" ""]}] |
| } |
| |
| func Example_listSlice() { |
| mem := memory.DefaultAllocator |
| dt := arrow.ListOf(arrow.BinaryTypes.String) |
| schema := arrow.NewSchema([]arrow.Field{{ |
| Name: "list", |
| Type: dt, |
| }}, nil) |
| |
| arr, _, err := array.FromJSON(mem, dt, strings.NewReader(`[ |
| ["index1"], |
| ["index3", "tag_int"], ["index5", "tag_int"], |
| ["index6", "tag_int"], ["index7", "tag_int"], |
| ["index7", "tag_int"], |
| ["index8"] |
| ]`)) |
| if err != nil { |
| panic(err) |
| } |
| defer arr.Release() |
| |
| rec := array.NewRecord(schema, []arrow.Array{arr}, int64(arr.Len())) |
| defer rec.Release() |
| rec2 := rec.NewSlice(1, 2) |
| defer rec2.Release() |
| |
| var buf bytes.Buffer |
| w := ipc.NewWriter(&buf, ipc.WithSchema(rec.Schema())) |
| if err := w.Write(rec2); err != nil { |
| panic(err) |
| } |
| if err := w.Close(); err != nil { |
| panic(err) |
| } |
| |
| r, err := ipc.NewReader(&buf) |
| if err != nil { |
| panic(err) |
| } |
| defer r.Release() |
| |
| r.Next() |
| fmt.Println(r.Record()) |
| |
| // Output: |
| // record: |
| // schema: |
| // fields: 1 |
| // - list: type=list<item: utf8, nullable> |
| // rows: 1 |
| // col[0][list]: [["index3" "tag_int"]] |
| } |
| |
| func TestIpcEmptyMap(t *testing.T) { |
| mem := memory.NewCheckedAllocator(memory.DefaultAllocator) |
| defer mem.AssertSize(t, 0) |
| |
| dt := arrow.MapOf(arrow.BinaryTypes.String, arrow.BinaryTypes.String) |
| schema := arrow.NewSchema([]arrow.Field{{ |
| Name: "map", |
| Type: dt, |
| }}, nil) |
| |
| arr, _, err := array.FromJSON(mem, dt, strings.NewReader(`[]`)) |
| require.NoError(t, err) |
| defer arr.Release() |
| |
| rec := array.NewRecord(schema, []arrow.Array{arr}, int64(arr.Len())) |
| defer rec.Release() |
| |
| var buf bytes.Buffer |
| w := ipc.NewWriter(&buf, ipc.WithSchema(rec.Schema())) |
| require.NoError(t, w.Write(rec)) |
| assert.NoError(t, w.Close()) |
| |
| r, err := ipc.NewReader(&buf) |
| require.NoError(t, err) |
| defer r.Release() |
| |
| assert.True(t, r.Next()) |
| assert.Zero(t, r.Record().NumRows()) |
| assert.True(t, arrow.TypeEqual(dt, r.Record().Column(0).DataType())) |
| } |