blob: 36c0aa3a52746bc0094cb6c6bc7b63876369b816 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package encoding_test
import (
"fmt"
"reflect"
"testing"
"unsafe"
"github.com/apache/arrow/go/v6/arrow"
"github.com/apache/arrow/go/v6/arrow/bitutil"
"github.com/apache/arrow/go/v6/arrow/memory"
"github.com/apache/arrow/go/v6/parquet"
"github.com/apache/arrow/go/v6/parquet/internal/encoding"
"github.com/apache/arrow/go/v6/parquet/internal/testutils"
"github.com/apache/arrow/go/v6/parquet/schema"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
)
type nodeFactory func(string, parquet.Repetition, int32) *schema.PrimitiveNode
func createNodeFactory(t reflect.Type) nodeFactory {
switch t {
case reflect.TypeOf(true):
return schema.NewBooleanNode
case reflect.TypeOf(int32(0)):
return schema.NewInt32Node
case reflect.TypeOf(int64(0)):
return schema.NewInt64Node
case reflect.TypeOf(parquet.Int96{}):
return schema.NewInt96Node
case reflect.TypeOf(float32(0)):
return schema.NewFloat32Node
case reflect.TypeOf(float64(0)):
return schema.NewFloat64Node
case reflect.TypeOf(parquet.ByteArray{}):
return schema.NewByteArrayNode
case reflect.TypeOf(parquet.FixedLenByteArray{}):
return func(name string, rep parquet.Repetition, field int32) *schema.PrimitiveNode {
return schema.NewFixedLenByteArrayNode(name, rep, 12, field)
}
}
return nil
}
func initdata(t reflect.Type, drawbuf, decodebuf []byte, nvals, repeats int, heap *memory.Buffer) (interface{}, interface{}) {
switch t {
case reflect.TypeOf(true):
draws := *(*[]bool)(unsafe.Pointer(&drawbuf))
decode := *(*[]bool)(unsafe.Pointer(&decodebuf))
testutils.InitValues(draws[:nvals], heap)
for j := 1; j < repeats; j++ {
for k := 0; k < nvals; k++ {
draws[nvals*j+k] = draws[k]
}
}
return draws[:nvals*repeats], decode[:nvals*repeats]
case reflect.TypeOf(int32(0)):
draws := arrow.Int32Traits.CastFromBytes(drawbuf)
decode := arrow.Int32Traits.CastFromBytes(decodebuf)
testutils.InitValues(draws[:nvals], heap)
for j := 1; j < repeats; j++ {
for k := 0; k < nvals; k++ {
draws[nvals*j+k] = draws[k]
}
}
return draws[:nvals*repeats], decode[:nvals*repeats]
case reflect.TypeOf(int64(0)):
draws := arrow.Int64Traits.CastFromBytes(drawbuf)
decode := arrow.Int64Traits.CastFromBytes(decodebuf)
testutils.InitValues(draws[:nvals], heap)
for j := 1; j < repeats; j++ {
for k := 0; k < nvals; k++ {
draws[nvals*j+k] = draws[k]
}
}
return draws[:nvals*repeats], decode[:nvals*repeats]
case reflect.TypeOf(parquet.Int96{}):
draws := parquet.Int96Traits.CastFromBytes(drawbuf)
decode := parquet.Int96Traits.CastFromBytes(decodebuf)
testutils.InitValues(draws[:nvals], heap)
for j := 1; j < repeats; j++ {
for k := 0; k < nvals; k++ {
draws[nvals*j+k] = draws[k]
}
}
return draws[:nvals*repeats], decode[:nvals*repeats]
case reflect.TypeOf(float32(0)):
draws := arrow.Float32Traits.CastFromBytes(drawbuf)
decode := arrow.Float32Traits.CastFromBytes(decodebuf)
testutils.InitValues(draws[:nvals], heap)
for j := 1; j < repeats; j++ {
for k := 0; k < nvals; k++ {
draws[nvals*j+k] = draws[k]
}
}
return draws[:nvals*repeats], decode[:nvals*repeats]
case reflect.TypeOf(float64(0)):
draws := arrow.Float64Traits.CastFromBytes(drawbuf)
decode := arrow.Float64Traits.CastFromBytes(decodebuf)
testutils.InitValues(draws[:nvals], heap)
for j := 1; j < repeats; j++ {
for k := 0; k < nvals; k++ {
draws[nvals*j+k] = draws[k]
}
}
return draws[:nvals*repeats], decode[:nvals*repeats]
case reflect.TypeOf(parquet.ByteArray{}):
draws := make([]parquet.ByteArray, nvals*repeats)
decode := make([]parquet.ByteArray, nvals*repeats)
testutils.InitValues(draws[:nvals], heap)
for j := 1; j < repeats; j++ {
for k := 0; k < nvals; k++ {
draws[nvals*j+k] = draws[k]
}
}
return draws[:nvals*repeats], decode[:nvals*repeats]
case reflect.TypeOf(parquet.FixedLenByteArray{}):
draws := make([]parquet.FixedLenByteArray, nvals*repeats)
decode := make([]parquet.FixedLenByteArray, nvals*repeats)
testutils.InitValues(draws[:nvals], heap)
for j := 1; j < repeats; j++ {
for k := 0; k < nvals; k++ {
draws[nvals*j+k] = draws[k]
}
}
return draws[:nvals*repeats], decode[:nvals*repeats]
}
return nil, nil
}
func encode(enc encoding.TypedEncoder, vals interface{}) {
switch v := vals.(type) {
case []bool:
enc.(encoding.BooleanEncoder).Put(v)
case []int32:
enc.(encoding.Int32Encoder).Put(v)
case []int64:
enc.(encoding.Int64Encoder).Put(v)
case []parquet.Int96:
enc.(encoding.Int96Encoder).Put(v)
case []float32:
enc.(encoding.Float32Encoder).Put(v)
case []float64:
enc.(encoding.Float64Encoder).Put(v)
case []parquet.ByteArray:
enc.(encoding.ByteArrayEncoder).Put(v)
case []parquet.FixedLenByteArray:
enc.(encoding.FixedLenByteArrayEncoder).Put(v)
}
}
func encodeSpaced(enc encoding.TypedEncoder, vals interface{}, validBits []byte, validBitsOffset int64) {
switch v := vals.(type) {
case []bool:
enc.(encoding.BooleanEncoder).PutSpaced(v, validBits, validBitsOffset)
case []int32:
enc.(encoding.Int32Encoder).PutSpaced(v, validBits, validBitsOffset)
case []int64:
enc.(encoding.Int64Encoder).PutSpaced(v, validBits, validBitsOffset)
case []parquet.Int96:
enc.(encoding.Int96Encoder).PutSpaced(v, validBits, validBitsOffset)
case []float32:
enc.(encoding.Float32Encoder).PutSpaced(v, validBits, validBitsOffset)
case []float64:
enc.(encoding.Float64Encoder).PutSpaced(v, validBits, validBitsOffset)
case []parquet.ByteArray:
enc.(encoding.ByteArrayEncoder).PutSpaced(v, validBits, validBitsOffset)
case []parquet.FixedLenByteArray:
enc.(encoding.FixedLenByteArrayEncoder).PutSpaced(v, validBits, validBitsOffset)
}
}
func decode(dec encoding.TypedDecoder, out interface{}) (int, error) {
switch v := out.(type) {
case []bool:
return dec.(encoding.BooleanDecoder).Decode(v)
case []int32:
return dec.(encoding.Int32Decoder).Decode(v)
case []int64:
return dec.(encoding.Int64Decoder).Decode(v)
case []parquet.Int96:
return dec.(encoding.Int96Decoder).Decode(v)
case []float32:
return dec.(encoding.Float32Decoder).Decode(v)
case []float64:
return dec.(encoding.Float64Decoder).Decode(v)
case []parquet.ByteArray:
return dec.(encoding.ByteArrayDecoder).Decode(v)
case []parquet.FixedLenByteArray:
return dec.(encoding.FixedLenByteArrayDecoder).Decode(v)
}
return 0, nil
}
func decodeSpaced(dec encoding.TypedDecoder, out interface{}, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
switch v := out.(type) {
case []bool:
return dec.(encoding.BooleanDecoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
case []int32:
return dec.(encoding.Int32Decoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
case []int64:
return dec.(encoding.Int64Decoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
case []parquet.Int96:
return dec.(encoding.Int96Decoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
case []float32:
return dec.(encoding.Float32Decoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
case []float64:
return dec.(encoding.Float64Decoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
case []parquet.ByteArray:
return dec.(encoding.ByteArrayDecoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
case []parquet.FixedLenByteArray:
return dec.(encoding.FixedLenByteArrayDecoder).DecodeSpaced(v, nullCount, validBits, validBitsOffset)
}
return 0, nil
}
type BaseEncodingTestSuite struct {
suite.Suite
descr *schema.Column
typeLen int
mem memory.Allocator
typ reflect.Type
nvalues int
heap *memory.Buffer
inputBytes *memory.Buffer
outputBytes *memory.Buffer
nodeFactory nodeFactory
draws interface{}
decodeBuf interface{}
}
func (b *BaseEncodingTestSuite) SetupSuite() {
b.mem = memory.DefaultAllocator
b.inputBytes = memory.NewResizableBuffer(b.mem)
b.outputBytes = memory.NewResizableBuffer(b.mem)
b.heap = memory.NewResizableBuffer(b.mem)
b.nodeFactory = createNodeFactory(b.typ)
}
func (b *BaseEncodingTestSuite) TearDownSuite() {
b.inputBytes.Release()
b.outputBytes.Release()
b.heap.Release()
}
func (b *BaseEncodingTestSuite) SetupTest() {
b.descr = schema.NewColumn(b.nodeFactory("name", parquet.Repetitions.Optional, -1), 0, 0)
b.typeLen = int(b.descr.TypeLength())
}
func (b *BaseEncodingTestSuite) initData(nvalues, repeats int) {
b.nvalues = nvalues * repeats
b.inputBytes.ResizeNoShrink(b.nvalues * int(b.typ.Size()))
b.outputBytes.ResizeNoShrink(b.nvalues * int(b.typ.Size()))
memory.Set(b.inputBytes.Buf(), 0)
memory.Set(b.outputBytes.Buf(), 0)
b.draws, b.decodeBuf = initdata(b.typ, b.inputBytes.Buf(), b.outputBytes.Buf(), nvalues, repeats, b.heap)
}
func (b *BaseEncodingTestSuite) encodeTestData(e parquet.Encoding) (encoding.Buffer, error) {
enc := encoding.NewEncoder(testutils.TypeToParquetType(b.typ), e, false, b.descr, memory.DefaultAllocator)
b.Equal(e, enc.Encoding())
b.Equal(b.descr.PhysicalType(), enc.Type())
encode(enc, reflect.ValueOf(b.draws).Slice(0, b.nvalues).Interface())
return enc.FlushValues()
}
func (b *BaseEncodingTestSuite) decodeTestData(e parquet.Encoding, buf []byte) {
dec := encoding.NewDecoder(testutils.TypeToParquetType(b.typ), e, b.descr, b.mem)
b.Equal(e, dec.Encoding())
b.Equal(b.descr.PhysicalType(), dec.Type())
dec.SetData(b.nvalues, buf)
decoded, _ := decode(dec, b.decodeBuf)
b.Equal(b.nvalues, decoded)
b.Equal(reflect.ValueOf(b.draws).Slice(0, b.nvalues).Interface(), reflect.ValueOf(b.decodeBuf).Slice(0, b.nvalues).Interface())
}
func (b *BaseEncodingTestSuite) encodeTestDataSpaced(e parquet.Encoding, validBits []byte, validBitsOffset int64) (encoding.Buffer, error) {
enc := encoding.NewEncoder(testutils.TypeToParquetType(b.typ), e, false, b.descr, memory.DefaultAllocator)
encodeSpaced(enc, reflect.ValueOf(b.draws).Slice(0, b.nvalues).Interface(), validBits, validBitsOffset)
return enc.FlushValues()
}
func (b *BaseEncodingTestSuite) decodeTestDataSpaced(e parquet.Encoding, nullCount int, buf []byte, validBits []byte, validBitsOffset int64) {
dec := encoding.NewDecoder(testutils.TypeToParquetType(b.typ), e, b.descr, b.mem)
dec.SetData(b.nvalues-nullCount, buf)
decoded, _ := decodeSpaced(dec, b.decodeBuf, nullCount, validBits, validBitsOffset)
b.Equal(b.nvalues, decoded)
drawval := reflect.ValueOf(b.draws)
decodeval := reflect.ValueOf(b.decodeBuf)
for j := 0; j < b.nvalues; j++ {
if bitutil.BitIsSet(validBits, int(validBitsOffset)+j) {
b.Equal(drawval.Index(j).Interface(), decodeval.Index(j).Interface())
}
}
}
func (b *BaseEncodingTestSuite) checkRoundTrip(e parquet.Encoding) {
buf, _ := b.encodeTestData(e)
defer buf.Release()
b.decodeTestData(e, buf.Bytes())
}
func (b *BaseEncodingTestSuite) checkRoundTripSpaced(e parquet.Encoding, validBits []byte, validBitsOffset int64) {
buf, _ := b.encodeTestDataSpaced(e, validBits, validBitsOffset)
defer buf.Release()
nullCount := 0
for i := 0; i < b.nvalues; i++ {
if bitutil.BitIsNotSet(validBits, int(validBitsOffset)+i) {
nullCount++
}
}
b.decodeTestDataSpaced(e, nullCount, buf.Bytes(), validBits, validBitsOffset)
}
func (b *BaseEncodingTestSuite) TestBasicRoundTrip() {
b.initData(10000, 1)
b.checkRoundTrip(parquet.Encodings.Plain)
}
func (b *BaseEncodingTestSuite) TestDeltaEncodingRoundTrip() {
b.initData(10000, 1)
switch b.typ {
case reflect.TypeOf(int32(0)), reflect.TypeOf(int64(0)):
b.checkRoundTrip(parquet.Encodings.DeltaBinaryPacked)
default:
b.Panics(func() { b.checkRoundTrip(parquet.Encodings.DeltaBinaryPacked) })
}
}
func (b *BaseEncodingTestSuite) TestDeltaLengthByteArrayRoundTrip() {
b.initData(10000, 1)
switch b.typ {
case reflect.TypeOf(parquet.ByteArray{}):
b.checkRoundTrip(parquet.Encodings.DeltaLengthByteArray)
default:
b.Panics(func() { b.checkRoundTrip(parquet.Encodings.DeltaLengthByteArray) })
}
}
func (b *BaseEncodingTestSuite) TestDeltaByteArrayRoundTrip() {
b.initData(10000, 1)
switch b.typ {
case reflect.TypeOf(parquet.ByteArray{}):
b.checkRoundTrip(parquet.Encodings.DeltaByteArray)
default:
b.Panics(func() { b.checkRoundTrip(parquet.Encodings.DeltaLengthByteArray) })
}
}
func (b *BaseEncodingTestSuite) TestSpacedRoundTrip() {
exec := func(vals, repeats int, validBitsOffset int64, nullProb float64) {
b.Run(fmt.Sprintf("%d vals %d repeats %d offset %0.3f null", vals, repeats, validBitsOffset, 1-nullProb), func() {
b.initData(vals, repeats)
size := int64(b.nvalues) + validBitsOffset
r := testutils.NewRandomArrayGenerator(1923)
arr := r.Uint8(size, 0, 100, 1-nullProb)
validBits := arr.NullBitmapBytes()
if validBits != nil {
b.checkRoundTripSpaced(parquet.Encodings.Plain, validBits, validBitsOffset)
switch b.typ {
case reflect.TypeOf(int32(0)), reflect.TypeOf(int64(0)):
b.checkRoundTripSpaced(parquet.Encodings.DeltaBinaryPacked, validBits, validBitsOffset)
case reflect.TypeOf(parquet.ByteArray{}):
b.checkRoundTripSpaced(parquet.Encodings.DeltaLengthByteArray, validBits, validBitsOffset)
b.checkRoundTripSpaced(parquet.Encodings.DeltaByteArray, validBits, validBitsOffset)
}
}
})
}
const (
avx512Size = 64
simdSize = avx512Size
multiSimdSize = simdSize * 33
)
for _, nullProb := range []float64{0.001, 0.1, 0.5, 0.9, 0.999} {
// Test with both size and offset up to 3 simd block
for i := 1; i < simdSize*3; i++ {
exec(i, 1, 0, nullProb)
exec(i, 1, int64(i+1), nullProb)
}
// large block and offset
exec(multiSimdSize, 1, 0, nullProb)
exec(multiSimdSize+33, 1, 0, nullProb)
exec(multiSimdSize, 1, 33, nullProb)
exec(multiSimdSize+33, 1, 33, nullProb)
}
}
func TestEncoding(t *testing.T) {
tests := []struct {
name string
typ reflect.Type
}{
{"Bool", reflect.TypeOf(true)},
{"Int32", reflect.TypeOf(int32(0))},
{"Int64", reflect.TypeOf(int64(0))},
{"Float32", reflect.TypeOf(float32(0))},
{"Float64", reflect.TypeOf(float64(0))},
{"Int96", reflect.TypeOf(parquet.Int96{})},
{"ByteArray", reflect.TypeOf(parquet.ByteArray{})},
{"FixedLenByteArray", reflect.TypeOf(parquet.FixedLenByteArray{})},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
suite.Run(t, &BaseEncodingTestSuite{typ: tt.typ})
})
}
}
type DictionaryEncodingTestSuite struct {
BaseEncodingTestSuite
}
func (d *DictionaryEncodingTestSuite) encodeTestDataDict(e parquet.Encoding) (dictBuffer, indices encoding.Buffer, numEntries int) {
enc := encoding.NewEncoder(testutils.TypeToParquetType(d.typ), e, true, d.descr, memory.DefaultAllocator).(encoding.DictEncoder)
d.Equal(parquet.Encodings.PlainDict, enc.Encoding())
d.Equal(d.descr.PhysicalType(), enc.Type())
encode(enc, reflect.ValueOf(d.draws).Slice(0, d.nvalues).Interface())
dictBuffer = memory.NewResizableBuffer(d.mem)
dictBuffer.Resize(enc.DictEncodedSize())
enc.WriteDict(dictBuffer.Bytes())
indices, _ = enc.FlushValues()
numEntries = enc.NumEntries()
return
}
func (d *DictionaryEncodingTestSuite) encodeTestDataDictSpaced(e parquet.Encoding, validBits []byte, validBitsOffset int64) (dictBuffer, indices encoding.Buffer, numEntries int) {
enc := encoding.NewEncoder(testutils.TypeToParquetType(d.typ), e, true, d.descr, memory.DefaultAllocator).(encoding.DictEncoder)
d.Equal(d.descr.PhysicalType(), enc.Type())
encodeSpaced(enc, reflect.ValueOf(d.draws).Slice(0, d.nvalues).Interface(), validBits, validBitsOffset)
dictBuffer = memory.NewResizableBuffer(d.mem)
dictBuffer.Resize(enc.DictEncodedSize())
enc.WriteDict(dictBuffer.Bytes())
indices, _ = enc.FlushValues()
numEntries = enc.NumEntries()
return
}
func (d *DictionaryEncodingTestSuite) checkRoundTrip() {
dictBuffer, indices, numEntries := d.encodeTestDataDict(parquet.Encodings.Plain)
defer dictBuffer.Release()
defer indices.Release()
validBits := make([]byte, int(bitutil.BytesForBits(int64(d.nvalues)))+1)
memory.Set(validBits, 255)
spacedBuffer, indicesSpaced, _ := d.encodeTestDataDictSpaced(parquet.Encodings.Plain, validBits, 0)
defer spacedBuffer.Release()
defer indicesSpaced.Release()
d.Equal(indices.Bytes(), indicesSpaced.Bytes())
dictDecoder := encoding.NewDecoder(testutils.TypeToParquetType(d.typ), parquet.Encodings.Plain, d.descr, d.mem)
d.Equal(d.descr.PhysicalType(), dictDecoder.Type())
dictDecoder.SetData(numEntries, dictBuffer.Bytes())
decoder := encoding.NewDictDecoder(testutils.TypeToParquetType(d.typ), d.descr, d.mem)
decoder.SetDict(dictDecoder)
decoder.SetData(d.nvalues, indices.Bytes())
decoded, _ := decode(decoder, d.decodeBuf)
d.Equal(d.nvalues, decoded)
d.Equal(reflect.ValueOf(d.draws).Slice(0, d.nvalues).Interface(), reflect.ValueOf(d.decodeBuf).Slice(0, d.nvalues).Interface())
decoder.SetData(d.nvalues, indices.Bytes())
decoded, _ = decodeSpaced(decoder, d.decodeBuf, 0, validBits, 0)
d.Equal(d.nvalues, decoded)
d.Equal(reflect.ValueOf(d.draws).Slice(0, d.nvalues).Interface(), reflect.ValueOf(d.decodeBuf).Slice(0, d.nvalues).Interface())
}
func (d *DictionaryEncodingTestSuite) TestBasicRoundTrip() {
d.initData(2500, 2)
d.checkRoundTrip()
}
func TestDictEncoding(t *testing.T) {
tests := []struct {
name string
typ reflect.Type
}{
{"Int32", reflect.TypeOf(int32(0))},
{"Int64", reflect.TypeOf(int64(0))},
{"Float32", reflect.TypeOf(float32(0))},
{"Float64", reflect.TypeOf(float64(0))},
{"ByteArray", reflect.TypeOf(parquet.ByteArray{})},
{"FixedLenByteArray", reflect.TypeOf(parquet.FixedLenByteArray{})},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
suite.Run(t, &DictionaryEncodingTestSuite{BaseEncodingTestSuite{typ: tt.typ}})
})
}
}
func TestWriteDeltaBitPackedInt32(t *testing.T) {
column := schema.NewColumn(schema.NewInt32Node("int32", parquet.Repetitions.Required, -1), 0, 0)
tests := []struct {
name string
toencode []int32
expected []byte
}{
{"simple 12345", []int32{1, 2, 3, 4, 5}, []byte{128, 1, 4, 5, 2, 2, 0, 0, 0, 0}},
{"odd vals", []int32{7, 5, 3, 1, 2, 3, 4, 5}, []byte{128, 1, 4, 8, 14, 3, 2, 0, 0, 0, 192, 63, 0, 0, 0, 0, 0, 0}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
enc := encoding.NewEncoder(parquet.Types.Int32, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
enc.(encoding.Int32Encoder).Put(tt.toencode)
buf, _ := enc.FlushValues()
defer buf.Release()
assert.Equal(t, tt.expected, buf.Bytes())
dec := encoding.NewDecoder(parquet.Types.Int32, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
dec.(encoding.Int32Decoder).SetData(len(tt.toencode), tt.expected)
out := make([]int32, len(tt.toencode))
dec.(encoding.Int32Decoder).Decode(out)
assert.Equal(t, tt.toencode, out)
})
}
t.Run("test progressive decoding", func(t *testing.T) {
values := make([]int32, 1000)
testutils.FillRandomInt32(0, values)
enc := encoding.NewEncoder(parquet.Types.Int32, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
enc.(encoding.Int32Encoder).Put(values)
buf, _ := enc.FlushValues()
defer buf.Release()
dec := encoding.NewDecoder(parquet.Types.Int32, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
dec.(encoding.Int32Decoder).SetData(len(values), buf.Bytes())
valueBuf := make([]int32, 100)
for i, j := 0, len(valueBuf); j <= len(values); i, j = i+len(valueBuf), j+len(valueBuf) {
dec.(encoding.Int32Decoder).Decode(valueBuf)
assert.Equalf(t, values[i:j], valueBuf, "indexes %d:%d", i, j)
}
})
}
func TestWriteDeltaBitPackedInt64(t *testing.T) {
column := schema.NewColumn(schema.NewInt64Node("int64", parquet.Repetitions.Required, -1), 0, 0)
tests := []struct {
name string
toencode []int64
expected []byte
}{
{"simple 12345", []int64{1, 2, 3, 4, 5}, []byte{128, 1, 4, 5, 2, 2, 0, 0, 0, 0}},
{"odd vals", []int64{7, 5, 3, 1, 2, 3, 4, 5}, []byte{128, 1, 4, 8, 14, 3, 2, 0, 0, 0, 192, 63, 0, 0, 0, 0, 0, 0}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
enc := encoding.NewEncoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
enc.(encoding.Int64Encoder).Put(tt.toencode)
buf, _ := enc.FlushValues()
defer buf.Release()
assert.Equal(t, tt.expected, buf.Bytes())
dec := encoding.NewDecoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
dec.(encoding.Int64Decoder).SetData(len(tt.toencode), tt.expected)
out := make([]int64, len(tt.toencode))
dec.(encoding.Int64Decoder).Decode(out)
assert.Equal(t, tt.toencode, out)
})
}
t.Run("test progressive decoding", func(t *testing.T) {
values := make([]int64, 1000)
testutils.FillRandomInt64(0, values)
enc := encoding.NewEncoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
enc.(encoding.Int64Encoder).Put(values)
buf, _ := enc.FlushValues()
defer buf.Release()
dec := encoding.NewDecoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
dec.(encoding.Int64Decoder).SetData(len(values), buf.Bytes())
valueBuf := make([]int64, 100)
for i, j := 0, len(valueBuf); j <= len(values); i, j = i+len(valueBuf), j+len(valueBuf) {
decoded, _ := dec.(encoding.Int64Decoder).Decode(valueBuf)
assert.Equal(t, len(valueBuf), decoded)
assert.Equalf(t, values[i:j], valueBuf, "indexes %d:%d", i, j)
}
})
}
func TestDeltaLengthByteArrayEncoding(t *testing.T) {
column := schema.NewColumn(schema.NewByteArrayNode("bytearray", parquet.Repetitions.Required, -1), 0, 0)
test := []parquet.ByteArray{[]byte("Hello"), []byte("World"), []byte("Foobar"), []byte("ABCDEF")}
expected := []byte{128, 1, 4, 4, 10, 0, 1, 0, 0, 0, 2, 0, 0, 0, 72, 101, 108, 108, 111, 87, 111, 114, 108, 100, 70, 111, 111, 98, 97, 114, 65, 66, 67, 68, 69, 70}
enc := encoding.NewEncoder(parquet.Types.ByteArray, parquet.Encodings.DeltaLengthByteArray, false, column, memory.DefaultAllocator)
enc.(encoding.ByteArrayEncoder).Put(test)
buf, _ := enc.FlushValues()
defer buf.Release()
assert.Equal(t, expected, buf.Bytes())
dec := encoding.NewDecoder(parquet.Types.ByteArray, parquet.Encodings.DeltaLengthByteArray, column, nil)
dec.SetData(len(test), expected)
out := make([]parquet.ByteArray, len(test))
decoded, _ := dec.(encoding.ByteArrayDecoder).Decode(out)
assert.Equal(t, len(test), decoded)
assert.Equal(t, test, out)
}
func TestDeltaByteArrayEncoding(t *testing.T) {
test := []parquet.ByteArray{[]byte("Hello"), []byte("World"), []byte("Foobar"), []byte("ABCDEF")}
expected := []byte{128, 1, 4, 4, 0, 0, 0, 0, 0, 0, 128, 1, 4, 4, 10, 0, 1, 0, 0, 0, 2, 0, 0, 0, 72, 101, 108, 108, 111, 87, 111, 114, 108, 100, 70, 111, 111, 98, 97, 114, 65, 66, 67, 68, 69, 70}
enc := encoding.NewEncoder(parquet.Types.ByteArray, parquet.Encodings.DeltaByteArray, false, nil, nil)
enc.(encoding.ByteArrayEncoder).Put(test)
buf, _ := enc.FlushValues()
defer buf.Release()
assert.Equal(t, expected, buf.Bytes())
dec := encoding.NewDecoder(parquet.Types.ByteArray, parquet.Encodings.DeltaByteArray, nil, nil)
dec.SetData(len(test), expected)
out := make([]parquet.ByteArray, len(test))
decoded, _ := dec.(encoding.ByteArrayDecoder).Decode(out)
assert.Equal(t, len(test), decoded)
assert.Equal(t, test, out)
}