| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package encoding_test |
| |
| import ( |
| "encoding/binary" |
| "strconv" |
| "testing" |
| |
| "github.com/apache/arrow/go/v6/arrow" |
| "github.com/apache/arrow/go/v6/arrow/memory" |
| "github.com/apache/arrow/go/v6/parquet" |
| "github.com/apache/arrow/go/v6/parquet/internal/encoding" |
| "github.com/apache/arrow/go/v6/parquet/internal/utils" |
| "github.com/stretchr/testify/assert" |
| ) |
| |
| func generateLevels(minRepeat, maxRepeat int, maxLevel int16) []int16 { |
| // for each repetition count up to max repeat |
| ret := make([]int16, 0) |
| for rep := minRepeat; rep <= maxRepeat; rep++ { |
| var ( |
| repCount = 1 << rep |
| val int16 = 0 |
| bwidth = 0 |
| ) |
| // generate levels for repetition count up to max level |
| for val <= maxLevel { |
| for i := 0; i < repCount; i++ { |
| ret = append(ret, val) |
| } |
| val = int16((2 << bwidth) - 1) |
| bwidth++ |
| } |
| } |
| return ret |
| } |
| |
| func encodeLevels(t *testing.T, enc parquet.Encoding, maxLvl int16, numLevels int, input []int16) []byte { |
| var ( |
| encoder encoding.LevelEncoder |
| lvlCount = 0 |
| buf = encoding.NewBufferWriter(2*numLevels, memory.DefaultAllocator) |
| ) |
| |
| if enc == parquet.Encodings.RLE { |
| buf.SetOffset(arrow.Int32SizeBytes) |
| // leave space to write the rle length value |
| encoder.Init(enc, maxLvl, buf) |
| lvlCount, _ = encoder.Encode(input) |
| buf.SetOffset(0) |
| arrow.Int32Traits.CastFromBytes(buf.Bytes())[0] = utils.ToLEInt32(int32(encoder.Len())) |
| } else { |
| encoder.Init(enc, maxLvl, buf) |
| lvlCount, _ = encoder.Encode(input) |
| } |
| |
| assert.Equal(t, numLevels, lvlCount) |
| return buf.Bytes() |
| } |
| |
| func verifyDecodingLvls(t *testing.T, enc parquet.Encoding, maxLvl int16, input []int16, buf []byte) { |
| var ( |
| decoder encoding.LevelDecoder |
| lvlCount = 0 |
| numLevels = len(input) |
| output = make([]int16, numLevels) |
| decodeCount = 4 |
| numInnerLevels = numLevels / decodeCount |
| ) |
| |
| // decode levels and test with multiple decode calls |
| _, err := decoder.SetData(enc, maxLvl, numLevels, buf) |
| assert.NoError(t, err) |
| // try multiple decoding on a single setdata call |
| for ct := 0; ct < decodeCount; ct++ { |
| offset := ct * numInnerLevels |
| lvlCount, _ = decoder.Decode(output[:numInnerLevels]) |
| assert.Equal(t, numInnerLevels, lvlCount) |
| assert.Equal(t, input[offset:offset+numInnerLevels], output[:numInnerLevels]) |
| } |
| |
| // check the remaining levels |
| var ( |
| levelsCompleted = decodeCount * (numLevels / decodeCount) |
| remaining = numLevels - levelsCompleted |
| ) |
| |
| if remaining > 0 { |
| lvlCount, _ = decoder.Decode(output[:remaining]) |
| assert.Equal(t, remaining, lvlCount) |
| assert.Equal(t, input[levelsCompleted:], output[:remaining]) |
| } |
| // test decode zero values |
| lvlCount, _ = decoder.Decode(output[:1]) |
| assert.Zero(t, lvlCount) |
| } |
| |
| func verifyDecodingMultipleSetData(t *testing.T, enc parquet.Encoding, max int16, input []int16, buf [][]byte) { |
| var ( |
| decoder encoding.LevelDecoder |
| lvlCount = 0 |
| setdataCount = len(buf) |
| numLevels = len(input) / setdataCount |
| output = make([]int16, numLevels) |
| ) |
| |
| for ct := 0; ct < setdataCount; ct++ { |
| offset := ct * numLevels |
| assert.Len(t, output, numLevels) |
| _, err := decoder.SetData(enc, max, numLevels, buf[ct]) |
| assert.NoError(t, err) |
| lvlCount, _ = decoder.Decode(output) |
| assert.Equal(t, numLevels, lvlCount) |
| assert.Equal(t, input[offset:offset+numLevels], output) |
| } |
| } |
| |
| func TestLevelsDecodeMultipleBitWidth(t *testing.T) { |
| t.Parallel() |
| // Test levels with maximum bit-width from 1 to 8 |
| // increase the repetition count for each iteration by a factor of 2 |
| var ( |
| minRepeat = 0 |
| maxRepeat = 7 // 128 |
| maxBitWidth = 8 |
| input []int16 |
| buf []byte |
| encodings = [2]parquet.Encoding{parquet.Encodings.RLE, parquet.Encodings.BitPacked} |
| ) |
| |
| for _, enc := range encodings { |
| t.Run(enc.String(), func(t *testing.T) { |
| // bitpacked requires a sequence of at least 8 |
| if enc == parquet.Encodings.BitPacked { |
| minRepeat = 3 |
| } |
| // for each max bit width |
| for bitWidth := 1; bitWidth <= maxBitWidth; bitWidth++ { |
| t.Run(strconv.Itoa(bitWidth), func(t *testing.T) { |
| max := int16((1 << bitWidth) - 1) |
| // generate levels |
| input = generateLevels(minRepeat, maxRepeat, max) |
| assert.NotPanics(t, func() { |
| buf = encodeLevels(t, enc, max, len(input), input) |
| }) |
| assert.NotPanics(t, func() { |
| verifyDecodingLvls(t, enc, max, input, buf) |
| }) |
| }) |
| } |
| }) |
| } |
| } |
| |
| func TestLevelsDecodeMultipleSetData(t *testing.T) { |
| t.Parallel() |
| |
| var ( |
| minRepeat = 3 |
| maxRepeat = 7 |
| bitWidth = 8 |
| maxLevel = int16((1 << bitWidth) - 1) |
| encodings = [2]parquet.Encoding{parquet.Encodings.RLE, parquet.Encodings.BitPacked} |
| ) |
| |
| input := generateLevels(minRepeat, maxRepeat, maxLevel) |
| |
| var ( |
| numLevels = len(input) |
| setdataFactor = 8 |
| splitLevelSize = numLevels / setdataFactor |
| buf = make([][]byte, setdataFactor) |
| ) |
| |
| for _, enc := range encodings { |
| t.Run(enc.String(), func(t *testing.T) { |
| for rf := 0; rf < setdataFactor; rf++ { |
| offset := rf * splitLevelSize |
| assert.NotPanics(t, func() { |
| buf[rf] = encodeLevels(t, enc, maxLevel, splitLevelSize, input[offset:offset+splitLevelSize]) |
| }) |
| } |
| assert.NotPanics(t, func() { |
| verifyDecodingMultipleSetData(t, enc, maxLevel, input, buf) |
| }) |
| }) |
| } |
| } |
| |
| func TestMinimumBufferSize(t *testing.T) { |
| t.Parallel() |
| |
| const numToEncode = 1024 |
| levels := make([]int16, numToEncode) |
| |
| for idx := range levels { |
| if idx%9 == 0 { |
| levels[idx] = 0 |
| } else { |
| levels[idx] = 1 |
| } |
| } |
| |
| output := encoding.NewBufferWriter(0, memory.DefaultAllocator) |
| |
| var encoder encoding.LevelEncoder |
| encoder.Init(parquet.Encodings.RLE, 1, output) |
| count, _ := encoder.Encode(levels) |
| assert.Equal(t, numToEncode, count) |
| } |
| |
| func TestMinimumBufferSize2(t *testing.T) { |
| t.Parallel() |
| |
| // test the worst case for bit_width=2 consisting of |
| // LiteralRun(size=8) |
| // RepeatedRun(size=8) |
| // LiteralRun(size=8) |
| // ... |
| const numToEncode = 1024 |
| levels := make([]int16, numToEncode) |
| |
| for idx := range levels { |
| // This forces a literal run of 00000001 |
| // followed by eight 1s |
| if (idx % 16) < 7 { |
| levels[idx] = 0 |
| } else { |
| levels[idx] = 1 |
| } |
| } |
| |
| for bitWidth := int16(1); bitWidth <= 8; bitWidth++ { |
| output := encoding.NewBufferWriter(0, memory.DefaultAllocator) |
| |
| var encoder encoding.LevelEncoder |
| encoder.Init(parquet.Encodings.RLE, bitWidth, output) |
| count, _ := encoder.Encode(levels) |
| assert.Equal(t, numToEncode, count) |
| } |
| } |
| |
| func TestEncodeDecodeLevels(t *testing.T) { |
| t.Parallel() |
| const numToEncode = 2048 |
| levels := make([]int16, numToEncode) |
| numones := 0 |
| for idx := range levels { |
| if (idx % 16) < 7 { |
| levels[idx] = 0 |
| } else { |
| levels[idx] = 1 |
| numones++ |
| } |
| } |
| |
| output := encoding.NewBufferWriter(0, memory.DefaultAllocator) |
| |
| var encoder encoding.LevelEncoder |
| encoder.Init(parquet.Encodings.RLE, 1, output) |
| count, _ := encoder.Encode(levels) |
| assert.Equal(t, numToEncode, count) |
| encoder.Flush() |
| |
| buf := output.Bytes() |
| var prefix [4]byte |
| binary.LittleEndian.PutUint32(prefix[:], uint32(len(buf))) |
| |
| var decoder encoding.LevelDecoder |
| _, err := decoder.SetData(parquet.Encodings.RLE, 1, numToEncode, append(prefix[:], buf...)) |
| assert.NoError(t, err) |
| |
| var levelOut [numToEncode]int16 |
| total, vals := decoder.Decode(levelOut[:]) |
| assert.EqualValues(t, numToEncode, total) |
| assert.EqualValues(t, numones, vals) |
| assert.Equal(t, levels, levelOut[:]) |
| } |