blob: 54fea7146fea0bbbe65a87c3a1b6037dd602c17f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pqarrow_test
import (
"bytes"
"context"
"io"
"os"
"path/filepath"
"testing"
"github.com/apache/arrow/go/v7/arrow"
"github.com/apache/arrow/go/v7/arrow/array"
"github.com/apache/arrow/go/v7/arrow/decimal128"
"github.com/apache/arrow/go/v7/arrow/memory"
"github.com/apache/arrow/go/v7/parquet/file"
"github.com/apache/arrow/go/v7/parquet/pqarrow"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func getDataDir() string {
datadir := os.Getenv("PARQUET_TEST_DATA")
if datadir == "" {
panic("please point PARQUET_TEST_DATA env var to the test data directory")
}
return datadir
}
func TestArrowReaderAdHocReadDecimals(t *testing.T) {
tests := []struct {
file string
typ *arrow.Decimal128Type
}{
{"int32_decimal", &arrow.Decimal128Type{Precision: 4, Scale: 2}},
{"int64_decimal", &arrow.Decimal128Type{Precision: 10, Scale: 2}},
{"fixed_length_decimal", &arrow.Decimal128Type{Precision: 25, Scale: 2}},
{"fixed_length_decimal_legacy", &arrow.Decimal128Type{Precision: 13, Scale: 2}},
{"byte_array_decimal", &arrow.Decimal128Type{Precision: 4, Scale: 2}},
}
dataDir := getDataDir()
for _, tt := range tests {
t.Run(tt.file, func(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)
filename := filepath.Join(dataDir, tt.file+".parquet")
require.FileExists(t, filename)
rdr, err := file.OpenParquetFile(filename, false)
require.NoError(t, err)
defer rdr.Close()
arrowRdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, mem)
require.NoError(t, err)
tbl, err := arrowRdr.ReadTable(context.Background())
require.NoError(t, err)
defer tbl.Release()
assert.EqualValues(t, 1, tbl.NumCols())
assert.Truef(t, arrow.TypeEqual(tbl.Schema().Field(0).Type, tt.typ), "expected: %s\ngot: %s", tbl.Schema().Field(0).Type, tt.typ)
const expectedLen = 24
valCol := tbl.Column(0)
assert.EqualValues(t, expectedLen, valCol.Len())
assert.Len(t, valCol.Data().Chunks(), 1)
chunk := valCol.Data().Chunk(0)
bldr := array.NewDecimal128Builder(mem, tt.typ)
defer bldr.Release()
for i := 0; i < expectedLen; i++ {
bldr.Append(decimal128.FromI64(int64((i + 1) * 100)))
}
expectedArr := bldr.NewDecimal128Array()
defer expectedArr.Release()
assert.Truef(t, array.ArrayEqual(expectedArr, chunk), "expected: %s\ngot: %s", expectedArr, chunk)
})
}
}
func TestRecordReaderParallel(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)
tbl := makeDateTimeTypesTable(mem, true, true)
defer tbl.Release()
var buf bytes.Buffer
require.NoError(t, pqarrow.WriteTable(tbl, &buf, tbl.NumRows(), nil, pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))))
pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
require.NoError(t, err)
reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{BatchSize: 3, Parallel: true}, mem)
require.NoError(t, err)
sc, err := reader.Schema()
assert.NoError(t, err)
assert.Truef(t, tbl.Schema().Equal(sc), "expected: %s\ngot: %s", tbl.Schema(), sc)
rr, err := reader.GetRecordReader(context.Background(), nil, nil)
assert.NoError(t, err)
assert.NotNil(t, rr)
defer rr.Release()
records := make([]arrow.Record, 0)
for rr.Next() {
rec := rr.Record()
defer rec.Release()
assert.Truef(t, sc.Equal(rec.Schema()), "expected: %s\ngot: %s", sc, rec.Schema())
rec.Retain()
records = append(records, rec)
}
assert.False(t, rr.Next())
tr := array.NewTableReader(tbl, 3)
defer tr.Release()
assert.True(t, tr.Next())
assert.Truef(t, array.RecordEqual(tr.Record(), records[0]), "expected: %s\ngot: %s", tr.Record(), records[0])
assert.True(t, tr.Next())
assert.Truef(t, array.RecordEqual(tr.Record(), records[1]), "expected: %s\ngot: %s", tr.Record(), records[1])
}
func TestRecordReaderSerial(t *testing.T) {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(t, 0)
tbl := makeDateTimeTypesTable(mem, true, true)
defer tbl.Release()
var buf bytes.Buffer
require.NoError(t, pqarrow.WriteTable(tbl, &buf, tbl.NumRows(), nil, pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))))
pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()))
require.NoError(t, err)
reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{BatchSize: 2}, mem)
require.NoError(t, err)
sc, err := reader.Schema()
assert.NoError(t, err)
assert.Truef(t, tbl.Schema().Equal(sc), "expected: %s\ngot: %s", tbl.Schema(), sc)
rr, err := reader.GetRecordReader(context.Background(), nil, nil)
assert.NoError(t, err)
assert.NotNil(t, rr)
defer rr.Release()
tr := array.NewTableReader(tbl, 2)
defer tr.Release()
rec, err := rr.Read()
assert.NoError(t, err)
tr.Next()
assert.Truef(t, array.RecordEqual(tr.Record(), rec), "expected: %s\ngot: %s", tr.Record(), rec)
rec, err = rr.Read()
assert.NoError(t, err)
tr.Next()
assert.Truef(t, array.RecordEqual(tr.Record(), rec), "expected: %s\ngot: %s", tr.Record(), rec)
rec, err = rr.Read()
assert.NoError(t, err)
tr.Next()
assert.Truef(t, array.RecordEqual(tr.Record(), rec), "expected: %s\ngot: %s", tr.Record(), rec)
rec, err = rr.Read()
assert.Same(t, io.EOF, err)
assert.Nil(t, rec)
}