| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package pqarrow_test |
| |
| import ( |
| "bytes" |
| "context" |
| "errors" |
| "fmt" |
| "math" |
| "strconv" |
| "strings" |
| "testing" |
| |
| "github.com/apache/arrow/go/v10/arrow" |
| "github.com/apache/arrow/go/v10/arrow/array" |
| "github.com/apache/arrow/go/v10/arrow/bitutil" |
| "github.com/apache/arrow/go/v10/arrow/decimal128" |
| "github.com/apache/arrow/go/v10/arrow/memory" |
| "github.com/apache/arrow/go/v10/internal/bitutils" |
| "github.com/apache/arrow/go/v10/internal/utils" |
| "github.com/apache/arrow/go/v10/parquet" |
| "github.com/apache/arrow/go/v10/parquet/compress" |
| "github.com/apache/arrow/go/v10/parquet/file" |
| "github.com/apache/arrow/go/v10/parquet/internal/encoding" |
| "github.com/apache/arrow/go/v10/parquet/internal/testutils" |
| "github.com/apache/arrow/go/v10/parquet/pqarrow" |
| "github.com/apache/arrow/go/v10/parquet/schema" |
| "github.com/stretchr/testify/assert" |
| "github.com/stretchr/testify/require" |
| "github.com/stretchr/testify/suite" |
| ) |
| |
| func makeSimpleTable(values *arrow.Chunked, nullable bool) arrow.Table { |
| sc := arrow.NewSchema([]arrow.Field{{Name: "col", Type: values.DataType(), Nullable: nullable}}, nil) |
| column := arrow.NewColumn(sc.Field(0), values) |
| defer column.Release() |
| return array.NewTable(sc, []arrow.Column{*column}, -1) |
| } |
| |
| func makeDateTimeTypesTable(mem memory.Allocator, expected bool, addFieldMeta bool) arrow.Table { |
| isValid := []bool{true, true, true, false, true, true} |
| |
| // roundtrip without modification |
| f0 := arrow.Field{Name: "f0", Type: arrow.FixedWidthTypes.Date32, Nullable: true} |
| f1 := arrow.Field{Name: "f1", Type: arrow.FixedWidthTypes.Timestamp_ms, Nullable: true} |
| f2 := arrow.Field{Name: "f2", Type: arrow.FixedWidthTypes.Timestamp_us, Nullable: true} |
| f3 := arrow.Field{Name: "f3", Type: arrow.FixedWidthTypes.Timestamp_ns, Nullable: true} |
| f3X := arrow.Field{Name: "f3", Type: arrow.FixedWidthTypes.Timestamp_us, Nullable: true} |
| f4 := arrow.Field{Name: "f4", Type: arrow.FixedWidthTypes.Time32ms, Nullable: true} |
| f5 := arrow.Field{Name: "f5", Type: arrow.FixedWidthTypes.Time64us, Nullable: true} |
| f6 := arrow.Field{Name: "f6", Type: arrow.FixedWidthTypes.Time64ns, Nullable: true} |
| |
| fieldList := []arrow.Field{f0, f1, f2} |
| if expected { |
| fieldList = append(fieldList, f3X) |
| } else { |
| fieldList = append(fieldList, f3) |
| } |
| fieldList = append(fieldList, f4, f5, f6) |
| |
| if addFieldMeta { |
| for idx := range fieldList { |
| fieldList[idx].Metadata = arrow.NewMetadata([]string{"PARQUET:field_id"}, []string{strconv.Itoa(idx + 1)}) |
| } |
| } |
| arrsc := arrow.NewSchema(fieldList, nil) |
| |
| d32Values := []arrow.Date32{1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000} |
| ts64nsValues := []arrow.Timestamp{1489269000000, 1489270000000, 1489271000000, 1489272000000, 1489272000000, 1489273000000} |
| ts64usValues := []arrow.Timestamp{1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000} |
| ts64msValues := []arrow.Timestamp{1489269, 1489270, 1489271, 1489272, 1489272, 1489273} |
| t32Values := []arrow.Time32{1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000} |
| t64nsValues := []arrow.Time64{1489269000000, 1489270000000, 1489271000000, 1489272000000, 1489272000000, 1489273000000} |
| t64usValues := []arrow.Time64{1489269000, 1489270000, 1489271000, 1489272000, 1489272000, 1489273000} |
| |
| builders := make([]array.Builder, 0, len(fieldList)) |
| for _, f := range fieldList { |
| bldr := array.NewBuilder(mem, f.Type) |
| defer bldr.Release() |
| builders = append(builders, bldr) |
| } |
| |
| builders[0].(*array.Date32Builder).AppendValues(d32Values, isValid) |
| builders[1].(*array.TimestampBuilder).AppendValues(ts64msValues, isValid) |
| builders[2].(*array.TimestampBuilder).AppendValues(ts64usValues, isValid) |
| if expected { |
| builders[3].(*array.TimestampBuilder).AppendValues(ts64usValues, isValid) |
| } else { |
| builders[3].(*array.TimestampBuilder).AppendValues(ts64nsValues, isValid) |
| } |
| builders[4].(*array.Time32Builder).AppendValues(t32Values, isValid) |
| builders[5].(*array.Time64Builder).AppendValues(t64usValues, isValid) |
| builders[6].(*array.Time64Builder).AppendValues(t64nsValues, isValid) |
| |
| cols := make([]arrow.Column, 0, len(fieldList)) |
| for idx, field := range fieldList { |
| arr := builders[idx].NewArray() |
| defer arr.Release() |
| |
| chunked := arrow.NewChunked(field.Type, []arrow.Array{arr}) |
| defer chunked.Release() |
| col := arrow.NewColumn(field, chunked) |
| defer col.Release() |
| cols = append(cols, *col) |
| } |
| |
| return array.NewTable(arrsc, cols, int64(len(isValid))) |
| } |
| |
| func TestWriteArrowCols(t *testing.T) { |
| mem := memory.NewCheckedAllocator(memory.DefaultAllocator) |
| defer mem.AssertSize(t, 0) |
| |
| tbl := makeDateTimeTypesTable(mem, false, false) |
| defer tbl.Release() |
| |
| psc, err := pqarrow.ToParquet(tbl.Schema(), nil, pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem))) |
| require.NoError(t, err) |
| |
| manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil) |
| require.NoError(t, err) |
| |
| sink := encoding.NewBufferWriter(0, memory.DefaultAllocator) |
| writer := file.NewParquetWriter(sink, psc.Root(), file.WithWriterProps(parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_4)))) |
| |
| srgw := writer.AppendRowGroup() |
| ctx := pqarrow.NewArrowWriteContext(context.TODO(), nil) |
| |
| for i := int64(0); i < tbl.NumCols(); i++ { |
| acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, int(i)) |
| require.NoError(t, err) |
| require.NoError(t, acw.Write(ctx)) |
| } |
| require.NoError(t, srgw.Close()) |
| require.NoError(t, writer.Close()) |
| |
| expected := makeDateTimeTypesTable(mem, true, false) |
| defer expected.Release() |
| |
| reader, err := file.NewParquetReader(bytes.NewReader(sink.Bytes())) |
| require.NoError(t, err) |
| |
| assert.EqualValues(t, expected.NumCols(), reader.MetaData().Schema.NumColumns()) |
| assert.EqualValues(t, expected.NumRows(), reader.NumRows()) |
| assert.EqualValues(t, 1, reader.NumRowGroups()) |
| |
| rgr := reader.RowGroup(0) |
| |
| for i := 0; i < int(expected.NumCols()); i++ { |
| var ( |
| total int64 |
| read int |
| defLevelsOut = make([]int16, int(expected.NumRows())) |
| arr = expected.Column(i).Data().Chunk(0) |
| ) |
| switch expected.Schema().Field(i).Type.(arrow.FixedWidthDataType).BitWidth() { |
| case 32: |
| col, err := rgr.Column(i) |
| assert.NoError(t, err) |
| colReader := col.(*file.Int32ColumnChunkReader) |
| vals := make([]int32, int(expected.NumRows())) |
| total, read, err = colReader.ReadBatch(expected.NumRows(), vals, defLevelsOut, nil) |
| require.NoError(t, err) |
| |
| nulls := 0 |
| for j := 0; j < arr.Len(); j++ { |
| if arr.IsNull(j) { |
| nulls++ |
| continue |
| } |
| |
| switch v := arr.(type) { |
| case *array.Date32: |
| assert.EqualValues(t, v.Value(j), vals[j-nulls]) |
| case *array.Time32: |
| assert.EqualValues(t, v.Value(j), vals[j-nulls]) |
| } |
| } |
| case 64: |
| col, err := rgr.Column(i) |
| assert.NoError(t, err) |
| colReader := col.(*file.Int64ColumnChunkReader) |
| vals := make([]int64, int(expected.NumRows())) |
| total, read, err = colReader.ReadBatch(expected.NumRows(), vals, defLevelsOut, nil) |
| require.NoError(t, err) |
| |
| nulls := 0 |
| for j := 0; j < arr.Len(); j++ { |
| if arr.IsNull(j) { |
| nulls++ |
| continue |
| } |
| |
| switch v := arr.(type) { |
| case *array.Date64: |
| assert.EqualValues(t, v.Value(j), vals[j-nulls]) |
| case *array.Time64: |
| assert.EqualValues(t, v.Value(j), vals[j-nulls]) |
| case *array.Timestamp: |
| assert.EqualValues(t, v.Value(j), vals[j-nulls]) |
| } |
| } |
| } |
| assert.EqualValues(t, expected.NumRows(), total) |
| assert.EqualValues(t, expected.NumRows()-1, read) |
| assert.Equal(t, []int16{1, 1, 1, 0, 1, 1}, defLevelsOut) |
| } |
| } |
| |
| func TestWriteArrowInt96(t *testing.T) { |
| mem := memory.NewCheckedAllocator(memory.DefaultAllocator) |
| defer mem.AssertSize(t, 0) |
| |
| tbl := makeDateTimeTypesTable(mem, false, false) |
| defer tbl.Release() |
| |
| props := pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true)) |
| psc, err := pqarrow.ToParquet(tbl.Schema(), nil, props) |
| require.NoError(t, err) |
| |
| manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil) |
| require.NoError(t, err) |
| |
| sink := encoding.NewBufferWriter(0, memory.DefaultAllocator) |
| writer := file.NewParquetWriter(sink, psc.Root()) |
| |
| srgw := writer.AppendRowGroup() |
| ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props) |
| |
| for i := int64(0); i < tbl.NumCols(); i++ { |
| acw, err := pqarrow.NewArrowColumnWriter(tbl.Column(int(i)).Data(), 0, tbl.NumRows(), manifest, srgw, int(i)) |
| require.NoError(t, err) |
| require.NoError(t, acw.Write(ctx)) |
| } |
| require.NoError(t, srgw.Close()) |
| require.NoError(t, writer.Close()) |
| |
| expected := makeDateTimeTypesTable(mem, false, false) |
| defer expected.Release() |
| |
| reader, err := file.NewParquetReader(bytes.NewReader(sink.Bytes())) |
| require.NoError(t, err) |
| |
| assert.EqualValues(t, expected.NumCols(), reader.MetaData().Schema.NumColumns()) |
| assert.EqualValues(t, expected.NumRows(), reader.NumRows()) |
| assert.EqualValues(t, 1, reader.NumRowGroups()) |
| |
| rgr := reader.RowGroup(0) |
| tsRdr, err := rgr.Column(3) |
| assert.NoError(t, err) |
| assert.Equal(t, parquet.Types.Int96, tsRdr.Type()) |
| |
| rdr := tsRdr.(*file.Int96ColumnChunkReader) |
| vals := make([]parquet.Int96, expected.NumRows()) |
| defLevels := make([]int16, int(expected.NumRows())) |
| |
| total, read, _ := rdr.ReadBatch(expected.NumRows(), vals, defLevels, nil) |
| assert.EqualValues(t, expected.NumRows(), total) |
| assert.EqualValues(t, expected.NumRows()-1, read) |
| assert.Equal(t, []int16{1, 1, 1, 0, 1, 1}, defLevels) |
| |
| data := expected.Column(3).Data().Chunk(0).(*array.Timestamp) |
| assert.EqualValues(t, data.Value(0), vals[0].ToTime().UnixNano()) |
| assert.EqualValues(t, data.Value(1), vals[1].ToTime().UnixNano()) |
| assert.EqualValues(t, data.Value(2), vals[2].ToTime().UnixNano()) |
| assert.EqualValues(t, data.Value(4), vals[3].ToTime().UnixNano()) |
| assert.EqualValues(t, data.Value(5), vals[4].ToTime().UnixNano()) |
| } |
| |
| func writeTableToBuffer(t *testing.T, tbl arrow.Table, rowGroupSize int64, props pqarrow.ArrowWriterProperties) *memory.Buffer { |
| sink := encoding.NewBufferWriter(0, memory.DefaultAllocator) |
| wrprops := parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0)) |
| psc, err := pqarrow.ToParquet(tbl.Schema(), wrprops, props) |
| require.NoError(t, err) |
| |
| manifest, err := pqarrow.NewSchemaManifest(psc, nil, nil) |
| require.NoError(t, err) |
| |
| writer := file.NewParquetWriter(sink, psc.Root(), file.WithWriterProps(wrprops)) |
| ctx := pqarrow.NewArrowWriteContext(context.TODO(), &props) |
| |
| offset := int64(0) |
| for offset < tbl.NumRows() { |
| sz := utils.Min(rowGroupSize, tbl.NumRows()-offset) |
| srgw := writer.AppendRowGroup() |
| for i := 0; i < int(tbl.NumCols()); i++ { |
| col := tbl.Column(i) |
| acw, err := pqarrow.NewArrowColumnWriter(col.Data(), offset, sz, manifest, srgw, i) |
| require.NoError(t, err) |
| require.NoError(t, acw.Write(ctx)) |
| } |
| srgw.Close() |
| offset += sz |
| } |
| writer.Close() |
| |
| return sink.Finish() |
| } |
| |
| func simpleRoundTrip(t *testing.T, tbl arrow.Table, rowGroupSize int64) { |
| buf := writeTableToBuffer(t, tbl, rowGroupSize, pqarrow.DefaultWriterProps()) |
| defer buf.Release() |
| |
| rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes())) |
| require.NoError(t, err) |
| |
| ardr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator) |
| require.NoError(t, err) |
| |
| for i := 0; i < int(tbl.NumCols()); i++ { |
| crdr, err := ardr.GetColumn(context.TODO(), i) |
| require.NoError(t, err) |
| |
| chunked, err := crdr.NextBatch(tbl.NumRows()) |
| require.NoError(t, err) |
| |
| require.EqualValues(t, tbl.NumRows(), chunked.Len()) |
| |
| chunkList := tbl.Column(i).Data().Chunks() |
| offset := int64(0) |
| for _, chnk := range chunkList { |
| slc := array.NewChunkedSlice(chunked, offset, offset+int64(chnk.Len())) |
| defer slc.Release() |
| |
| assert.EqualValues(t, chnk.Len(), slc.Len()) |
| if len(slc.Chunks()) == 1 { |
| offset += int64(chnk.Len()) |
| assert.True(t, array.Equal(chnk, slc.Chunk(0))) |
| } |
| } |
| } |
| } |
| |
| func TestArrowReadWriteTableChunkedCols(t *testing.T) { |
| chunkSizes := []int{2, 4, 10, 2} |
| const totalLen = int64(18) |
| |
| rng := testutils.NewRandomArrayGenerator(0) |
| |
| arr := rng.Int32(totalLen, 0, math.MaxInt32/2, 0.9) |
| defer arr.Release() |
| |
| offset := int64(0) |
| chunks := make([]arrow.Array, 0) |
| for _, chnksize := range chunkSizes { |
| chk := array.NewSlice(arr, offset, offset+int64(chnksize)) |
| defer chk.Release() |
| chunks = append(chunks, chk) |
| } |
| |
| sc := arrow.NewSchema([]arrow.Field{{Name: "field", Type: arr.DataType(), Nullable: true}}, nil) |
| tbl := array.NewTable(sc, []arrow.Column{*arrow.NewColumn(sc.Field(0), arrow.NewChunked(arr.DataType(), chunks))}, -1) |
| defer tbl.Release() |
| |
| simpleRoundTrip(t, tbl, 2) |
| simpleRoundTrip(t, tbl, 10) |
| } |
| |
| // set this up for checking our expected results so we can test the functions |
| // that generate them which we export |
| func getLogicalType(typ arrow.DataType) schema.LogicalType { |
| switch typ.ID() { |
| case arrow.INT8: |
| return schema.NewIntLogicalType(8, true) |
| case arrow.UINT8: |
| return schema.NewIntLogicalType(8, false) |
| case arrow.INT16: |
| return schema.NewIntLogicalType(16, true) |
| case arrow.UINT16: |
| return schema.NewIntLogicalType(16, false) |
| case arrow.INT32: |
| return schema.NewIntLogicalType(32, true) |
| case arrow.UINT32: |
| return schema.NewIntLogicalType(32, false) |
| case arrow.INT64: |
| return schema.NewIntLogicalType(64, true) |
| case arrow.UINT64: |
| return schema.NewIntLogicalType(64, false) |
| case arrow.STRING: |
| return schema.StringLogicalType{} |
| case arrow.DATE32: |
| return schema.DateLogicalType{} |
| case arrow.DATE64: |
| return schema.DateLogicalType{} |
| case arrow.TIMESTAMP: |
| ts := typ.(*arrow.TimestampType) |
| adjustedUTC := len(ts.TimeZone) == 0 |
| switch ts.Unit { |
| case arrow.Microsecond: |
| return schema.NewTimestampLogicalType(adjustedUTC, schema.TimeUnitMicros) |
| case arrow.Millisecond: |
| return schema.NewTimestampLogicalType(adjustedUTC, schema.TimeUnitMillis) |
| case arrow.Nanosecond: |
| return schema.NewTimestampLogicalType(adjustedUTC, schema.TimeUnitNanos) |
| default: |
| panic("only milli, micro and nano units supported for arrow timestamp") |
| } |
| case arrow.TIME32: |
| return schema.NewTimeLogicalType(false, schema.TimeUnitMillis) |
| case arrow.TIME64: |
| ts := typ.(*arrow.Time64Type) |
| switch ts.Unit { |
| case arrow.Microsecond: |
| return schema.NewTimeLogicalType(false, schema.TimeUnitMicros) |
| case arrow.Nanosecond: |
| return schema.NewTimeLogicalType(false, schema.TimeUnitNanos) |
| default: |
| panic("only micro and nano seconds are supported for arrow TIME64") |
| } |
| case arrow.DECIMAL: |
| dec := typ.(*arrow.Decimal128Type) |
| return schema.NewDecimalLogicalType(dec.Precision, dec.Scale) |
| } |
| return schema.NoLogicalType{} |
| } |
| |
| func getPhysicalType(typ arrow.DataType) parquet.Type { |
| switch typ.ID() { |
| case arrow.BOOL: |
| return parquet.Types.Boolean |
| case arrow.UINT8, arrow.INT8, arrow.UINT16, arrow.INT16, arrow.UINT32, arrow.INT32: |
| return parquet.Types.Int32 |
| case arrow.INT64, arrow.UINT64: |
| return parquet.Types.Int64 |
| case arrow.FLOAT32: |
| return parquet.Types.Float |
| case arrow.FLOAT64: |
| return parquet.Types.Double |
| case arrow.BINARY, arrow.STRING: |
| return parquet.Types.ByteArray |
| case arrow.FIXED_SIZE_BINARY, arrow.DECIMAL: |
| return parquet.Types.FixedLenByteArray |
| case arrow.DATE32: |
| return parquet.Types.Int32 |
| case arrow.DATE64: |
| // convert to date32 internally |
| return parquet.Types.Int32 |
| case arrow.TIME32: |
| return parquet.Types.Int32 |
| case arrow.TIME64, arrow.TIMESTAMP: |
| return parquet.Types.Int64 |
| default: |
| return parquet.Types.Int32 |
| } |
| } |
| |
| const ( |
| boolTestValue = true |
| uint8TestVal = uint8(64) |
| int8TestVal = int8(-64) |
| uint16TestVal = uint16(1024) |
| int16TestVal = int16(-1024) |
| uint32TestVal = uint32(1024) |
| int32TestVal = int32(-1024) |
| uint64TestVal = uint64(1024) |
| int64TestVal = int64(-1024) |
| tsTestValue = arrow.Timestamp(14695634030000) |
| date32TestVal = arrow.Date32(170000) |
| floatTestVal = float32(2.1) |
| doubleTestVal = float64(4.2) |
| strTestVal = "Test" |
| |
| smallSize = 100 |
| ) |
| |
| type ParquetIOTestSuite struct { |
| suite.Suite |
| } |
| |
| func (ps *ParquetIOTestSuite) makeSimpleSchema(typ arrow.DataType, rep parquet.Repetition) *schema.GroupNode { |
| byteWidth := int32(-1) |
| |
| switch typ := typ.(type) { |
| case *arrow.FixedSizeBinaryType: |
| byteWidth = int32(typ.ByteWidth) |
| case *arrow.Decimal128Type: |
| byteWidth = pqarrow.DecimalSize(typ.Precision) |
| } |
| |
| pnode, _ := schema.NewPrimitiveNodeLogical("column1", rep, getLogicalType(typ), getPhysicalType(typ), int(byteWidth), -1) |
| return schema.MustGroup(schema.NewGroupNode("schema", parquet.Repetitions.Required, schema.FieldList{pnode}, -1)) |
| } |
| |
| func (ps *ParquetIOTestSuite) makePrimitiveTestCol(size int, typ arrow.DataType) arrow.Array { |
| switch typ.ID() { |
| case arrow.BOOL: |
| bldr := array.NewBooleanBuilder(memory.DefaultAllocator) |
| defer bldr.Release() |
| for i := 0; i < size; i++ { |
| bldr.Append(boolTestValue) |
| } |
| return bldr.NewArray() |
| case arrow.INT8: |
| bldr := array.NewInt8Builder(memory.DefaultAllocator) |
| defer bldr.Release() |
| for i := 0; i < size; i++ { |
| bldr.Append(int8TestVal) |
| } |
| return bldr.NewArray() |
| case arrow.UINT8: |
| bldr := array.NewUint8Builder(memory.DefaultAllocator) |
| defer bldr.Release() |
| for i := 0; i < size; i++ { |
| bldr.Append(uint8TestVal) |
| } |
| return bldr.NewArray() |
| case arrow.INT16: |
| bldr := array.NewInt16Builder(memory.DefaultAllocator) |
| defer bldr.Release() |
| for i := 0; i < size; i++ { |
| bldr.Append(int16TestVal) |
| } |
| return bldr.NewArray() |
| case arrow.UINT16: |
| bldr := array.NewUint16Builder(memory.DefaultAllocator) |
| defer bldr.Release() |
| for i := 0; i < size; i++ { |
| bldr.Append(uint16TestVal) |
| } |
| return bldr.NewArray() |
| case arrow.INT32: |
| bldr := array.NewInt32Builder(memory.DefaultAllocator) |
| defer bldr.Release() |
| for i := 0; i < size; i++ { |
| bldr.Append(int32TestVal) |
| } |
| return bldr.NewArray() |
| case arrow.UINT32: |
| bldr := array.NewUint32Builder(memory.DefaultAllocator) |
| defer bldr.Release() |
| for i := 0; i < size; i++ { |
| bldr.Append(uint32TestVal) |
| } |
| return bldr.NewArray() |
| case arrow.INT64: |
| bldr := array.NewInt64Builder(memory.DefaultAllocator) |
| defer bldr.Release() |
| for i := 0; i < size; i++ { |
| bldr.Append(int64TestVal) |
| } |
| return bldr.NewArray() |
| case arrow.UINT64: |
| bldr := array.NewUint64Builder(memory.DefaultAllocator) |
| defer bldr.Release() |
| for i := 0; i < size; i++ { |
| bldr.Append(uint64TestVal) |
| } |
| return bldr.NewArray() |
| case arrow.FLOAT32: |
| bldr := array.NewFloat32Builder(memory.DefaultAllocator) |
| defer bldr.Release() |
| for i := 0; i < size; i++ { |
| bldr.Append(floatTestVal) |
| } |
| return bldr.NewArray() |
| case arrow.FLOAT64: |
| bldr := array.NewFloat64Builder(memory.DefaultAllocator) |
| defer bldr.Release() |
| for i := 0; i < size; i++ { |
| bldr.Append(doubleTestVal) |
| } |
| return bldr.NewArray() |
| } |
| return nil |
| } |
| |
| func (ps *ParquetIOTestSuite) makeTestFile(typ arrow.DataType, arr arrow.Array, numChunks int) []byte { |
| sc := ps.makeSimpleSchema(typ, parquet.Repetitions.Required) |
| sink := encoding.NewBufferWriter(0, memory.DefaultAllocator) |
| writer := file.NewParquetWriter(sink, sc) |
| |
| ctx := pqarrow.NewArrowWriteContext(context.TODO(), nil) |
| rowGroupSize := arr.Len() / numChunks |
| |
| for i := 0; i < numChunks; i++ { |
| rgw := writer.AppendRowGroup() |
| cw, err := rgw.NextColumn() |
| ps.NoError(err) |
| |
| start := i * rowGroupSize |
| ps.NoError(pqarrow.WriteArrowToColumn(ctx, cw, array.NewSlice(arr, int64(start), int64(start+rowGroupSize)), nil, nil, false)) |
| cw.Close() |
| rgw.Close() |
| } |
| writer.Close() |
| buf := sink.Finish() |
| defer buf.Release() |
| return buf.Bytes() |
| } |
| |
| func (ps *ParquetIOTestSuite) createReader(data []byte) *pqarrow.FileReader { |
| rdr, err := file.NewParquetReader(bytes.NewReader(data)) |
| ps.NoError(err) |
| |
| reader, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator) |
| ps.NoError(err) |
| return reader |
| } |
| |
| func (ps *ParquetIOTestSuite) readTable(rdr *pqarrow.FileReader) arrow.Table { |
| tbl, err := rdr.ReadTable(context.TODO()) |
| ps.NoError(err) |
| ps.NotNil(tbl) |
| return tbl |
| } |
| |
| func (ps *ParquetIOTestSuite) checkSingleColumnRequiredTableRead(typ arrow.DataType, numChunks int) { |
| values := ps.makePrimitiveTestCol(smallSize, typ) |
| defer values.Release() |
| |
| data := ps.makeTestFile(typ, values, numChunks) |
| reader := ps.createReader(data) |
| |
| tbl := ps.readTable(reader) |
| defer tbl.Release() |
| |
| ps.EqualValues(1, tbl.NumCols()) |
| ps.EqualValues(smallSize, tbl.NumRows()) |
| |
| chunked := tbl.Column(0).Data() |
| ps.Len(chunked.Chunks(), 1) |
| ps.True(array.Equal(values, chunked.Chunk(0))) |
| } |
| |
| func (ps *ParquetIOTestSuite) checkSingleColumnRead(typ arrow.DataType, numChunks int) { |
| values := ps.makePrimitiveTestCol(smallSize, typ) |
| defer values.Release() |
| |
| data := ps.makeTestFile(typ, values, numChunks) |
| reader := ps.createReader(data) |
| |
| cr, err := reader.GetColumn(context.TODO(), 0) |
| ps.NoError(err) |
| |
| chunked, err := cr.NextBatch(smallSize) |
| ps.NoError(err) |
| defer chunked.Release() |
| |
| ps.Len(chunked.Chunks(), 1) |
| ps.True(array.Equal(values, chunked.Chunk(0))) |
| } |
| |
| func (ps *ParquetIOTestSuite) TestDateTimeTypesReadWriteTable() { |
| mem := memory.NewCheckedAllocator(memory.DefaultAllocator) |
| defer mem.AssertSize(ps.T(), 0) |
| |
| toWrite := makeDateTimeTypesTable(mem, false, true) |
| defer toWrite.Release() |
| buf := writeTableToBuffer(ps.T(), toWrite, toWrite.NumRows(), pqarrow.DefaultWriterProps()) |
| defer buf.Release() |
| |
| reader := ps.createReader(buf.Bytes()) |
| tbl := ps.readTable(reader) |
| defer tbl.Release() |
| |
| expected := makeDateTimeTypesTable(mem, true, true) |
| defer expected.Release() |
| |
| ps.Equal(expected.NumCols(), tbl.NumCols()) |
| ps.Equal(expected.NumRows(), tbl.NumRows()) |
| ps.Truef(expected.Schema().Equal(tbl.Schema()), "expected schema: %s\ngot schema: %s", expected.Schema(), tbl.Schema()) |
| |
| for i := 0; i < int(expected.NumCols()); i++ { |
| exChunk := expected.Column(i).Data() |
| tblChunk := tbl.Column(i).Data() |
| |
| ps.Equal(len(exChunk.Chunks()), len(tblChunk.Chunks())) |
| ps.Truef(array.Equal(exChunk.Chunk(0), tblChunk.Chunk(0)), "expected %s\ngot %s", exChunk.Chunk(0), tblChunk.Chunk(0)) |
| } |
| } |
| |
| func (ps *ParquetIOTestSuite) TestDateTimeTypesWithInt96ReadWriteTable() { |
| mem := memory.NewCheckedAllocator(memory.DefaultAllocator) |
| defer mem.AssertSize(ps.T(), 0) |
| |
| expected := makeDateTimeTypesTable(mem, false, true) |
| defer expected.Release() |
| buf := writeTableToBuffer(ps.T(), expected, expected.NumRows(), pqarrow.NewArrowWriterProperties(pqarrow.WithDeprecatedInt96Timestamps(true))) |
| defer buf.Release() |
| |
| reader := ps.createReader(buf.Bytes()) |
| tbl := ps.readTable(reader) |
| defer tbl.Release() |
| |
| ps.Equal(expected.NumCols(), tbl.NumCols()) |
| ps.Equal(expected.NumRows(), tbl.NumRows()) |
| ps.Truef(expected.Schema().Equal(tbl.Schema()), "expected schema: %s\ngot schema: %s", expected.Schema(), tbl.Schema()) |
| |
| for i := 0; i < int(expected.NumCols()); i++ { |
| exChunk := expected.Column(i).Data() |
| tblChunk := tbl.Column(i).Data() |
| |
| ps.Equal(len(exChunk.Chunks()), len(tblChunk.Chunks())) |
| ps.Truef(array.Equal(exChunk.Chunk(0), tblChunk.Chunk(0)), "expected %s\ngot %s", exChunk.Chunk(0), tblChunk.Chunk(0)) |
| } |
| } |
| |
| func (ps *ParquetIOTestSuite) TestReadSingleColumnFile() { |
| types := []arrow.DataType{ |
| arrow.FixedWidthTypes.Boolean, |
| arrow.PrimitiveTypes.Uint8, |
| arrow.PrimitiveTypes.Int8, |
| arrow.PrimitiveTypes.Uint16, |
| arrow.PrimitiveTypes.Int16, |
| arrow.PrimitiveTypes.Uint32, |
| arrow.PrimitiveTypes.Int32, |
| arrow.PrimitiveTypes.Uint64, |
| arrow.PrimitiveTypes.Int64, |
| arrow.PrimitiveTypes.Float32, |
| arrow.PrimitiveTypes.Float64, |
| } |
| |
| nchunks := []int{1, 4} |
| |
| for _, n := range nchunks { |
| for _, dt := range types { |
| ps.Run(fmt.Sprintf("%s %d chunks", dt.Name(), n), func() { |
| ps.checkSingleColumnRead(dt, n) |
| }) |
| } |
| } |
| } |
| |
| func (ps *ParquetIOTestSuite) TestSingleColumnRequiredRead() { |
| types := []arrow.DataType{ |
| arrow.FixedWidthTypes.Boolean, |
| arrow.PrimitiveTypes.Uint8, |
| arrow.PrimitiveTypes.Int8, |
| arrow.PrimitiveTypes.Uint16, |
| arrow.PrimitiveTypes.Int16, |
| arrow.PrimitiveTypes.Uint32, |
| arrow.PrimitiveTypes.Int32, |
| arrow.PrimitiveTypes.Uint64, |
| arrow.PrimitiveTypes.Int64, |
| arrow.PrimitiveTypes.Float32, |
| arrow.PrimitiveTypes.Float64, |
| } |
| |
| nchunks := []int{1, 4} |
| |
| for _, n := range nchunks { |
| for _, dt := range types { |
| ps.Run(fmt.Sprintf("%s %d chunks", dt.Name(), n), func() { |
| ps.checkSingleColumnRequiredTableRead(dt, n) |
| }) |
| } |
| } |
| } |
| |
| func (ps *ParquetIOTestSuite) TestReadDecimals() { |
| bigEndian := []parquet.ByteArray{ |
| // 123456 |
| []byte{1, 226, 64}, |
| // 987654 |
| []byte{15, 18, 6}, |
| // -123456 |
| []byte{255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 254, 29, 192}, |
| } |
| |
| bldr := array.NewDecimal128Builder(memory.DefaultAllocator, &arrow.Decimal128Type{Precision: 6, Scale: 3}) |
| defer bldr.Release() |
| |
| bldr.Append(decimal128.FromU64(123456)) |
| bldr.Append(decimal128.FromU64(987654)) |
| bldr.Append(decimal128.FromI64(-123456)) |
| |
| expected := bldr.NewDecimal128Array() |
| defer expected.Release() |
| |
| sc := schema.MustGroup(schema.NewGroupNode("schema", parquet.Repetitions.Required, schema.FieldList{ |
| schema.Must(schema.NewPrimitiveNodeLogical("decimals", parquet.Repetitions.Required, schema.NewDecimalLogicalType(6, 3), parquet.Types.ByteArray, -1, -1)), |
| }, -1)) |
| |
| sink := encoding.NewBufferWriter(0, memory.DefaultAllocator) |
| writer := file.NewParquetWriter(sink, sc) |
| |
| rgw := writer.AppendRowGroup() |
| cw, _ := rgw.NextColumn() |
| cw.(*file.ByteArrayColumnChunkWriter).WriteBatch(bigEndian, nil, nil) |
| cw.Close() |
| rgw.Close() |
| writer.Close() |
| |
| rdr := ps.createReader(sink.Bytes()) |
| cr, err := rdr.GetColumn(context.TODO(), 0) |
| ps.NoError(err) |
| |
| chunked, err := cr.NextBatch(smallSize) |
| ps.NoError(err) |
| defer chunked.Release() |
| |
| ps.Len(chunked.Chunks(), 1) |
| ps.True(array.Equal(expected, chunked.Chunk(0))) |
| } |
| |
| func (ps *ParquetIOTestSuite) writeColumn(sc *schema.GroupNode, values arrow.Array) []byte { |
| var buf bytes.Buffer |
| arrsc, err := pqarrow.FromParquet(schema.NewSchema(sc), nil, nil) |
| ps.NoError(err) |
| |
| writer, err := pqarrow.NewFileWriter(arrsc, &buf, parquet.NewWriterProperties(parquet.WithDictionaryDefault(false)), pqarrow.DefaultWriterProps()) |
| ps.NoError(err) |
| |
| writer.NewRowGroup() |
| ps.NoError(writer.WriteColumnData(values)) |
| ps.NoError(writer.Close()) |
| ps.NoError(writer.Close()) |
| |
| return buf.Bytes() |
| } |
| |
| func (ps *ParquetIOTestSuite) readAndCheckSingleColumnFile(data []byte, values arrow.Array) { |
| reader := ps.createReader(data) |
| cr, err := reader.GetColumn(context.TODO(), 0) |
| ps.NoError(err) |
| ps.NotNil(cr) |
| |
| chunked, err := cr.NextBatch(smallSize) |
| ps.NoError(err) |
| defer chunked.Release() |
| |
| ps.Len(chunked.Chunks(), 1) |
| ps.NotNil(chunked.Chunk(0)) |
| |
| ps.True(array.Equal(values, chunked.Chunk(0))) |
| } |
| |
| var fullTypeList = []arrow.DataType{ |
| arrow.FixedWidthTypes.Boolean, |
| arrow.PrimitiveTypes.Uint8, |
| arrow.PrimitiveTypes.Int8, |
| arrow.PrimitiveTypes.Uint16, |
| arrow.PrimitiveTypes.Int16, |
| arrow.PrimitiveTypes.Uint32, |
| arrow.PrimitiveTypes.Int32, |
| arrow.PrimitiveTypes.Uint64, |
| arrow.PrimitiveTypes.Int64, |
| arrow.FixedWidthTypes.Date32, |
| arrow.PrimitiveTypes.Float32, |
| arrow.PrimitiveTypes.Float64, |
| arrow.BinaryTypes.String, |
| arrow.BinaryTypes.Binary, |
| &arrow.FixedSizeBinaryType{ByteWidth: 10}, |
| &arrow.Decimal128Type{Precision: 1, Scale: 0}, |
| &arrow.Decimal128Type{Precision: 5, Scale: 4}, |
| &arrow.Decimal128Type{Precision: 10, Scale: 9}, |
| &arrow.Decimal128Type{Precision: 19, Scale: 18}, |
| &arrow.Decimal128Type{Precision: 23, Scale: 22}, |
| &arrow.Decimal128Type{Precision: 27, Scale: 26}, |
| &arrow.Decimal128Type{Precision: 38, Scale: 37}, |
| } |
| |
| func (ps *ParquetIOTestSuite) TestSingleColumnRequiredWrite() { |
| for _, dt := range fullTypeList { |
| ps.Run(dt.Name(), func() { |
| values := testutils.RandomNonNull(dt, smallSize) |
| sc := ps.makeSimpleSchema(dt, parquet.Repetitions.Required) |
| data := ps.writeColumn(sc, values) |
| ps.readAndCheckSingleColumnFile(data, values) |
| }) |
| } |
| } |
| |
| func (ps *ParquetIOTestSuite) roundTripTable(expected arrow.Table, storeSchema bool) { |
| var buf bytes.Buffer |
| var props pqarrow.ArrowWriterProperties |
| if storeSchema { |
| props = pqarrow.NewArrowWriterProperties(pqarrow.WithStoreSchema()) |
| } else { |
| props = pqarrow.DefaultWriterProps() |
| } |
| |
| ps.Require().NoError(pqarrow.WriteTable(expected, &buf, expected.NumRows(), nil, props)) |
| |
| reader := ps.createReader(buf.Bytes()) |
| tbl := ps.readTable(reader) |
| defer tbl.Release() |
| |
| ps.Equal(expected.NumCols(), tbl.NumCols()) |
| ps.Equal(expected.NumRows(), tbl.NumRows()) |
| |
| exChunk := expected.Column(0).Data() |
| tblChunk := tbl.Column(0).Data() |
| |
| ps.Equal(len(exChunk.Chunks()), len(tblChunk.Chunks())) |
| if exChunk.DataType().ID() != arrow.STRUCT { |
| ps.Truef(array.Equal(exChunk.Chunk(0), tblChunk.Chunk(0)), "expected: %s\ngot: %s", exChunk.Chunk(0), tblChunk.Chunk(0)) |
| } else { |
| // current impl of ArrayEquals for structs doesn't correctly handle nulls in the parent |
| // with a non-nullable child when comparing. Since after the round trip, the data in the |
| // child will have the nulls, not the original data. |
| ex := exChunk.Chunk(0) |
| tb := tblChunk.Chunk(0) |
| ps.Equal(ex.NullN(), tb.NullN()) |
| if ex.NullN() > 0 { |
| ps.Equal(ex.NullBitmapBytes()[:int(bitutil.BytesForBits(int64(ex.Len())))], tb.NullBitmapBytes()[:int(bitutil.BytesForBits(int64(tb.Len())))]) |
| } |
| ps.Equal(ex.Len(), tb.Len()) |
| // only compare the non-null values |
| ps.NoErrorf(bitutils.VisitSetBitRuns(ex.NullBitmapBytes(), int64(ex.Data().Offset()), int64(ex.Len()), func(pos, length int64) error { |
| if !ps.True(array.SliceEqual(ex, pos, pos+length, tb, pos, pos+length)) { |
| return errors.New("failed") |
| } |
| return nil |
| }), "expected: %s\ngot: %s", ex, tb) |
| } |
| } |
| |
| func makeEmptyListsArray(size int) arrow.Array { |
| // allocate an offsets buffer with only zeros |
| offsetsNbytes := arrow.Int32Traits.BytesRequired(size + 1) |
| offsetsBuffer := make([]byte, offsetsNbytes) |
| |
| childBuffers := []*memory.Buffer{nil, nil} |
| childData := array.NewData(arrow.PrimitiveTypes.Float32, 0, childBuffers, nil, 0, 0) |
| defer childData.Release() |
| buffers := []*memory.Buffer{nil, memory.NewBufferBytes(offsetsBuffer)} |
| arrayData := array.NewData(arrow.ListOf(childData.DataType()), size, buffers, []arrow.ArrayData{childData}, 0, 0) |
| defer arrayData.Release() |
| return array.MakeFromData(arrayData) |
| } |
| |
| func makeListArray(values arrow.Array, size, nullcount int) arrow.Array { |
| nonNullEntries := size - nullcount - 1 |
| lengthPerEntry := values.Len() / nonNullEntries |
| |
| offsets := make([]byte, arrow.Int32Traits.BytesRequired(size+1)) |
| offsetsArr := arrow.Int32Traits.CastFromBytes(offsets) |
| |
| nullBitmap := make([]byte, int(bitutil.BytesForBits(int64(size)))) |
| |
| curOffset := 0 |
| for i := 0; i < size; i++ { |
| offsetsArr[i] = int32(curOffset) |
| if !(((i % 2) == 0) && ((i / 2) < nullcount)) { |
| // non-null list (list with index 1 is always empty) |
| bitutil.SetBit(nullBitmap, i) |
| if i != 1 { |
| curOffset += lengthPerEntry |
| } |
| } |
| } |
| offsetsArr[size] = int32(values.Len()) |
| |
| listData := array.NewData(arrow.ListOf(values.DataType()), size, |
| []*memory.Buffer{memory.NewBufferBytes(nullBitmap), memory.NewBufferBytes(offsets)}, |
| []arrow.ArrayData{values.Data()}, nullcount, 0) |
| defer listData.Release() |
| return array.NewListData(listData) |
| } |
| |
| func prepareEmptyListsTable(size int) arrow.Table { |
| lists := makeEmptyListsArray(size) |
| defer lists.Release() |
| chunked := arrow.NewChunked(lists.DataType(), []arrow.Array{lists}) |
| defer chunked.Release() |
| return makeSimpleTable(chunked, true) |
| } |
| |
| func prepareListTable(dt arrow.DataType, size int, nullableLists bool, nullableElems bool, nullCount int) arrow.Table { |
| nc := nullCount |
| if !nullableElems { |
| nc = 0 |
| } |
| values := testutils.RandomNullable(dt, size*size, nc) |
| defer values.Release() |
| // also test that slice offsets are respected |
| values = array.NewSlice(values, 5, int64(values.Len())) |
| defer values.Release() |
| |
| if !nullableLists { |
| nullCount = 0 |
| } |
| lists := makeListArray(values, size, nullCount) |
| defer lists.Release() |
| |
| chunked := arrow.NewChunked(lists.DataType(), []arrow.Array{lists}) |
| defer chunked.Release() |
| |
| return makeSimpleTable(array.NewChunkedSlice(chunked, 3, int64(size)), nullableLists) |
| } |
| |
| func prepareListOfListTable(dt arrow.DataType, size, nullCount int, nullableParentLists, nullableLists, nullableElems bool) arrow.Table { |
| nc := nullCount |
| if !nullableElems { |
| nc = 0 |
| } |
| |
| values := testutils.RandomNullable(dt, size*6, nc) |
| defer values.Release() |
| |
| if nullableLists { |
| nc = nullCount |
| } else { |
| nc = 0 |
| } |
| |
| lists := makeListArray(values, size*3, nc) |
| defer lists.Release() |
| |
| if !nullableParentLists { |
| nullCount = 0 |
| } |
| |
| parentLists := makeListArray(lists, size, nullCount) |
| defer parentLists.Release() |
| |
| chunked := arrow.NewChunked(parentLists.DataType(), []arrow.Array{parentLists}) |
| defer chunked.Release() |
| |
| return makeSimpleTable(chunked, nullableParentLists) |
| } |
| |
| func (ps *ParquetIOTestSuite) TestSingleEmptyListsColumnReadWrite() { |
| expected := prepareEmptyListsTable(smallSize) |
| buf := writeTableToBuffer(ps.T(), expected, smallSize, pqarrow.DefaultWriterProps()) |
| defer buf.Release() |
| |
| reader := ps.createReader(buf.Bytes()) |
| tbl := ps.readTable(reader) |
| defer tbl.Release() |
| |
| ps.EqualValues(expected.NumCols(), tbl.NumCols()) |
| ps.EqualValues(expected.NumRows(), tbl.NumRows()) |
| |
| exChunk := expected.Column(0).Data() |
| tblChunk := tbl.Column(0).Data() |
| |
| ps.Equal(len(exChunk.Chunks()), len(tblChunk.Chunks())) |
| ps.True(array.Equal(exChunk.Chunk(0), tblChunk.Chunk(0))) |
| } |
| |
| func (ps *ParquetIOTestSuite) TestSingleColumnOptionalReadWrite() { |
| for _, dt := range fullTypeList { |
| ps.Run(dt.Name(), func() { |
| values := testutils.RandomNullable(dt, smallSize, 10) |
| sc := ps.makeSimpleSchema(dt, parquet.Repetitions.Optional) |
| data := ps.writeColumn(sc, values) |
| ps.readAndCheckSingleColumnFile(data, values) |
| }) |
| } |
| } |
| |
| func (ps *ParquetIOTestSuite) TestSingleNullableListNullableColumnReadWrite() { |
| for _, dt := range fullTypeList { |
| ps.Run(dt.Name(), func() { |
| expected := prepareListTable(dt, smallSize, true, true, 10) |
| defer expected.Release() |
| ps.roundTripTable(expected, false) |
| }) |
| } |
| } |
| |
| func (ps *ParquetIOTestSuite) TestSingleRequiredListNullableColumnReadWrite() { |
| for _, dt := range fullTypeList { |
| ps.Run(dt.Name(), func() { |
| expected := prepareListTable(dt, smallSize, false, true, 10) |
| defer expected.Release() |
| ps.roundTripTable(expected, false) |
| }) |
| } |
| } |
| |
| func (ps *ParquetIOTestSuite) TestSingleNullableListRequiredColumnReadWrite() { |
| for _, dt := range fullTypeList { |
| ps.Run(dt.Name(), func() { |
| expected := prepareListTable(dt, smallSize, true, false, 10) |
| defer expected.Release() |
| ps.roundTripTable(expected, false) |
| }) |
| } |
| } |
| |
| func (ps *ParquetIOTestSuite) TestSingleRequiredListRequiredColumnReadWrite() { |
| for _, dt := range fullTypeList { |
| ps.Run(dt.Name(), func() { |
| expected := prepareListTable(dt, smallSize, false, false, 0) |
| defer expected.Release() |
| ps.roundTripTable(expected, false) |
| }) |
| } |
| } |
| |
| func (ps *ParquetIOTestSuite) TestSingleNullableListRequiredListRequiredColumnReadWrite() { |
| for _, dt := range fullTypeList { |
| ps.Run(dt.Name(), func() { |
| expected := prepareListOfListTable(dt, smallSize, 2, true, false, false) |
| defer expected.Release() |
| ps.roundTripTable(expected, false) |
| }) |
| } |
| } |
| |
| func (ps *ParquetIOTestSuite) TestSimpleStruct() { |
| links := arrow.StructOf(arrow.Field{Name: "Backward", Type: arrow.PrimitiveTypes.Int64, Nullable: true}, |
| arrow.Field{Name: "Forward", Type: arrow.PrimitiveTypes.Int64, Nullable: true}) |
| |
| bldr := array.NewStructBuilder(memory.DefaultAllocator, links) |
| defer bldr.Release() |
| |
| backBldr := bldr.FieldBuilder(0).(*array.Int64Builder) |
| forwardBldr := bldr.FieldBuilder(1).(*array.Int64Builder) |
| |
| bldr.Append(true) |
| backBldr.AppendNull() |
| forwardBldr.Append(20) |
| |
| bldr.Append(true) |
| backBldr.Append(10) |
| forwardBldr.Append(40) |
| |
| data := bldr.NewArray() |
| defer data.Release() |
| |
| tbl := array.NewTable(arrow.NewSchema([]arrow.Field{{Name: "links", Type: links}}, nil), |
| []arrow.Column{*arrow.NewColumn(arrow.Field{Name: "links", Type: links}, arrow.NewChunked(links, []arrow.Array{data}))}, -1) |
| defer tbl.Release() |
| |
| ps.roundTripTable(tbl, false) |
| } |
| |
| func (ps *ParquetIOTestSuite) TestSingleColumnNullableStruct() { |
| links := arrow.StructOf(arrow.Field{Name: "Backward", Type: arrow.PrimitiveTypes.Int64, Nullable: true}) |
| bldr := array.NewStructBuilder(memory.DefaultAllocator, links) |
| defer bldr.Release() |
| |
| backBldr := bldr.FieldBuilder(0).(*array.Int64Builder) |
| |
| bldr.AppendNull() |
| bldr.Append(true) |
| backBldr.Append(10) |
| |
| data := bldr.NewArray() |
| defer data.Release() |
| |
| tbl := array.NewTable(arrow.NewSchema([]arrow.Field{{Name: "links", Type: links, Nullable: true}}, nil), |
| []arrow.Column{*arrow.NewColumn(arrow.Field{Name: "links", Type: links, Nullable: true}, arrow.NewChunked(links, []arrow.Array{data}))}, -1) |
| defer tbl.Release() |
| |
| ps.roundTripTable(tbl, false) |
| } |
| |
| func (ps *ParquetIOTestSuite) TestNestedRequiredFieldStruct() { |
| intField := arrow.Field{Name: "int_array", Type: arrow.PrimitiveTypes.Int32} |
| intBldr := array.NewInt32Builder(memory.DefaultAllocator) |
| defer intBldr.Release() |
| intBldr.AppendValues([]int32{0, 1, 2, 3, 4, 5, 7, 8}, nil) |
| |
| intArr := intBldr.NewArray() |
| defer intArr.Release() |
| |
| validity := memory.NewBufferBytes([]byte{0xCC}) |
| defer validity.Release() |
| |
| structField := arrow.Field{Name: "root", Type: arrow.StructOf(intField), Nullable: true} |
| structData := array.NewData(structField.Type, 8, []*memory.Buffer{validity}, []arrow.ArrayData{intArr.Data()}, 4, 0) |
| defer structData.Release() |
| stData := array.NewStructData(structData) |
| defer stData.Release() |
| |
| tbl := array.NewTable(arrow.NewSchema([]arrow.Field{structField}, nil), |
| []arrow.Column{*arrow.NewColumn(structField, |
| arrow.NewChunked(structField.Type, []arrow.Array{stData}))}, -1) |
| defer tbl.Release() |
| |
| ps.roundTripTable(tbl, false) |
| } |
| |
| func (ps *ParquetIOTestSuite) TestNestedNullableField() { |
| intField := arrow.Field{Name: "int_array", Type: arrow.PrimitiveTypes.Int32, Nullable: true} |
| intBldr := array.NewInt32Builder(memory.DefaultAllocator) |
| defer intBldr.Release() |
| intBldr.AppendValues([]int32{0, 1, 2, 3, 4, 5, 7, 8}, []bool{true, false, true, false, true, true, false, true}) |
| |
| intArr := intBldr.NewArray() |
| defer intArr.Release() |
| |
| validity := memory.NewBufferBytes([]byte{0xCC}) |
| defer validity.Release() |
| |
| structField := arrow.Field{Name: "root", Type: arrow.StructOf(intField), Nullable: true} |
| data := array.NewData(structField.Type, 8, []*memory.Buffer{validity}, []arrow.ArrayData{intArr.Data()}, 4, 0) |
| defer data.Release() |
| stData := array.NewStructData(data) |
| defer stData.Release() |
| |
| tbl := array.NewTable(arrow.NewSchema([]arrow.Field{structField}, nil), |
| []arrow.Column{*arrow.NewColumn(structField, |
| arrow.NewChunked(structField.Type, []arrow.Array{stData}))}, -1) |
| defer tbl.Release() |
| |
| ps.roundTripTable(tbl, false) |
| } |
| |
| func (ps *ParquetIOTestSuite) TestCanonicalNestedRoundTrip() { |
| docIdField := arrow.Field{Name: "DocID", Type: arrow.PrimitiveTypes.Int64} |
| linksField := arrow.Field{Name: "Links", Type: arrow.StructOf( |
| arrow.Field{Name: "Backward", Type: arrow.ListOf(arrow.PrimitiveTypes.Int64)}, |
| arrow.Field{Name: "Forward", Type: arrow.ListOf(arrow.PrimitiveTypes.Int64)}, |
| ), Nullable: true} |
| |
| nameStruct := arrow.StructOf( |
| arrow.Field{Name: "Language", Nullable: true, Type: arrow.ListOf( |
| arrow.StructOf(arrow.Field{Name: "Code", Type: arrow.BinaryTypes.String}, |
| arrow.Field{Name: "Country", Type: arrow.BinaryTypes.String, Nullable: true}))}, |
| arrow.Field{Name: "Url", Type: arrow.BinaryTypes.String, Nullable: true}) |
| |
| nameField := arrow.Field{Name: "Name", Type: arrow.ListOf(nameStruct)} |
| sc := arrow.NewSchema([]arrow.Field{docIdField, linksField, nameField}, nil) |
| |
| docIDArr, _, err := array.FromJSON(memory.DefaultAllocator, docIdField.Type, strings.NewReader("[10, 20]")) |
| ps.Require().NoError(err) |
| defer docIDArr.Release() |
| |
| linksIDArr, _, err := array.FromJSON(memory.DefaultAllocator, linksField.Type, strings.NewReader(`[{"Backward":[], "Forward":[20, 40, 60]}, {"Backward":[10, 30], "Forward": [80]}]`)) |
| ps.Require().NoError(err) |
| defer linksIDArr.Release() |
| |
| nameArr, _, err := array.FromJSON(memory.DefaultAllocator, nameField.Type, strings.NewReader(` |
| [[{"Language": [{"Code": "en_us", "Country": "us"}, |
| {"Code": "en_us", "Country": null}], |
| "Url": "http://A"}, |
| {"Url": "http://B", "Language": null}, |
| {"Language": [{"Code": "en-gb", "Country": "gb"}], "Url": null}], |
| [{"Url": "http://C", "Language": null}]]`)) |
| ps.Require().NoError(err) |
| defer nameArr.Release() |
| |
| expected := array.NewTable(sc, []arrow.Column{ |
| *arrow.NewColumn(docIdField, arrow.NewChunked(docIdField.Type, []arrow.Array{docIDArr})), |
| *arrow.NewColumn(linksField, arrow.NewChunked(linksField.Type, []arrow.Array{linksIDArr})), |
| *arrow.NewColumn(nameField, arrow.NewChunked(nameField.Type, []arrow.Array{nameArr})), |
| }, 2) |
| |
| ps.roundTripTable(expected, false) |
| } |
| |
| func (ps *ParquetIOTestSuite) TestFixedSizeList() { |
| bldr := array.NewFixedSizeListBuilder(memory.DefaultAllocator, 3, arrow.PrimitiveTypes.Int16) |
| defer bldr.Release() |
| |
| vb := bldr.ValueBuilder().(*array.Int16Builder) |
| |
| bldr.AppendValues([]bool{true, true, true}) |
| vb.AppendValues([]int16{1, 2, 3, 4, 5, 6, 7, 8, 9}, nil) |
| |
| data := bldr.NewArray() |
| field := arrow.Field{Name: "root", Type: data.DataType(), Nullable: true} |
| expected := array.NewTable(arrow.NewSchema([]arrow.Field{field}, nil), |
| []arrow.Column{*arrow.NewColumn(field, arrow.NewChunked(field.Type, []arrow.Array{data}))}, -1) |
| |
| ps.roundTripTable(expected, true) |
| } |
| |
| func (ps *ParquetIOTestSuite) TestNull() { |
| bldr := array.NewNullBuilder(memory.DefaultAllocator) |
| defer bldr.Release() |
| |
| bldr.AppendNull() |
| bldr.AppendNull() |
| bldr.AppendNull() |
| |
| data := bldr.NewArray() |
| defer data.Release() |
| |
| field := arrow.Field{Name: "x", Type: data.DataType(), Nullable: true} |
| expected := array.NewTable( |
| arrow.NewSchema([]arrow.Field{field}, nil), |
| []arrow.Column{*arrow.NewColumn(field, arrow.NewChunked(field.Type, []arrow.Array{data}))}, |
| -1, |
| ) |
| |
| ps.roundTripTable(expected, true) |
| } |
| |
| func TestParquetArrowIO(t *testing.T) { |
| suite.Run(t, new(ParquetIOTestSuite)) |
| } |
| |
| func TestBufferedRecWrite(t *testing.T) { |
| sc := arrow.NewSchema([]arrow.Field{ |
| {Name: "f32", Type: arrow.PrimitiveTypes.Float32, Nullable: true}, |
| {Name: "i32", Type: arrow.PrimitiveTypes.Int32, Nullable: true}, |
| {Name: "struct_i64_f64", Type: arrow.StructOf( |
| arrow.Field{Name: "i64", Type: arrow.PrimitiveTypes.Int64, Nullable: true}, |
| arrow.Field{Name: "f64", Type: arrow.PrimitiveTypes.Float64, Nullable: true})}, |
| }, nil) |
| |
| structData := array.NewData(sc.Field(2).Type, SIZELEN, |
| []*memory.Buffer{nil, nil}, |
| []arrow.ArrayData{testutils.RandomNullable(arrow.PrimitiveTypes.Int64, SIZELEN, 0).Data(), testutils.RandomNullable(arrow.PrimitiveTypes.Float64, SIZELEN, 0).Data()}, 0, 0) |
| defer structData.Release() |
| cols := []arrow.Array{ |
| testutils.RandomNullable(sc.Field(0).Type, SIZELEN, SIZELEN/5), |
| testutils.RandomNullable(sc.Field(1).Type, SIZELEN, SIZELEN/5), |
| array.NewStructData(structData), |
| } |
| |
| rec := array.NewRecord(sc, cols, SIZELEN) |
| defer rec.Release() |
| |
| var ( |
| buf bytes.Buffer |
| ) |
| |
| wr, err := pqarrow.NewFileWriter(sc, &buf, |
| parquet.NewWriterProperties(parquet.WithCompression(compress.Codecs.Snappy), parquet.WithDictionaryDefault(false), parquet.WithDataPageSize(100*1024)), |
| pqarrow.DefaultWriterProps()) |
| require.NoError(t, err) |
| |
| p1 := rec.NewSlice(0, SIZELEN/2) |
| defer p1.Release() |
| require.NoError(t, wr.WriteBuffered(p1)) |
| |
| p2 := rec.NewSlice(SIZELEN/2, SIZELEN) |
| defer p2.Release() |
| require.NoError(t, wr.WriteBuffered(p2)) |
| |
| wr.Close() |
| |
| rdr, err := file.NewParquetReader(bytes.NewReader(buf.Bytes())) |
| assert.NoError(t, err) |
| |
| assert.EqualValues(t, 1, rdr.NumRowGroups()) |
| assert.EqualValues(t, SIZELEN, rdr.NumRows()) |
| rdr.Close() |
| |
| tbl, err := pqarrow.ReadTable(context.Background(), bytes.NewReader(buf.Bytes()), nil, pqarrow.ArrowReadProperties{}, nil) |
| assert.NoError(t, err) |
| defer tbl.Release() |
| |
| assert.EqualValues(t, SIZELEN, tbl.NumRows()) |
| } |
| |
| func (ps *ParquetIOTestSuite) TestArrowMapTypeRoundTrip() { |
| bldr := array.NewMapBuilder(memory.DefaultAllocator, arrow.BinaryTypes.String, arrow.PrimitiveTypes.Int32, false) |
| defer bldr.Release() |
| |
| kb := bldr.KeyBuilder().(*array.StringBuilder) |
| ib := bldr.ItemBuilder().(*array.Int32Builder) |
| |
| bldr.Append(true) |
| kb.AppendValues([]string{"Fee", "Fi", "Fo", "Fum"}, nil) |
| ib.AppendValues([]int32{1, 2, 3, 4}, nil) |
| |
| bldr.Append(true) |
| kb.AppendValues([]string{"Fee", "Fi", "Fo"}, nil) |
| ib.AppendValues([]int32{5, 4, 3}, nil) |
| |
| bldr.AppendNull() |
| |
| bldr.Append(true) |
| kb.AppendValues([]string{"Fo", "Fi", "Fee"}, nil) |
| ib.AppendValues([]int32{-1, 2, 3}, []bool{false, true, true}) |
| |
| arr := bldr.NewArray() |
| defer arr.Release() |
| |
| fld := arrow.Field{Name: "mapped", Type: arr.DataType(), Nullable: true} |
| tbl := array.NewTable(arrow.NewSchema([]arrow.Field{fld}, nil), |
| []arrow.Column{*arrow.NewColumn(fld, arrow.NewChunked(arr.DataType(), []arrow.Array{arr}))}, -1) |
| defer tbl.Release() |
| |
| ps.roundTripTable(tbl, true) |
| } |
| |
| func TestWriteTableMemoryAllocation(t *testing.T) { |
| allocator := memory.NewCheckedAllocator(memory.DefaultAllocator) |
| sc := arrow.NewSchema([]arrow.Field{ |
| {Name: "f32", Type: arrow.PrimitiveTypes.Float32, Nullable: true}, |
| {Name: "i32", Type: arrow.PrimitiveTypes.Int32, Nullable: true}, |
| {Name: "struct_i64_f64", Type: arrow.StructOf( |
| arrow.Field{Name: "i64", Type: arrow.PrimitiveTypes.Int64, Nullable: true}, |
| arrow.Field{Name: "f64", Type: arrow.PrimitiveTypes.Float64, Nullable: true})}, |
| {Name: "arr_i64", Type: arrow.ListOf(arrow.PrimitiveTypes.Int64)}, |
| }, nil) |
| |
| bld := array.NewRecordBuilder(allocator, sc) |
| bld.Field(0).(*array.Float32Builder).Append(1.0) |
| bld.Field(1).(*array.Int32Builder).Append(1) |
| sbld := bld.Field(2).(*array.StructBuilder) |
| sbld.Append(true) |
| sbld.FieldBuilder(0).(*array.Int64Builder).Append(1) |
| sbld.FieldBuilder(1).(*array.Float64Builder).Append(1.0) |
| abld := bld.Field(3).(*array.ListBuilder) |
| abld.Append(true) |
| abld.ValueBuilder().(*array.Int64Builder).Append(2) |
| |
| rec := bld.NewRecord() |
| bld.Release() |
| |
| var buf bytes.Buffer |
| wr, err := pqarrow.NewFileWriter(sc, &buf, |
| parquet.NewWriterProperties(parquet.WithCompression(compress.Codecs.Snappy)), |
| pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(allocator))) |
| require.NoError(t, err) |
| |
| require.NoError(t, wr.Write(rec)) |
| rec.Release() |
| wr.Close() |
| |
| require.Zero(t, allocator.CurrentAlloc()) |
| } |