| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| // Package utils contains various internal utilities for the parquet library |
| // that aren't intended to be exposed to external consumers such as interfaces |
| // and bitmap readers/writers including the RLE encoder/decoder and so on. |
| package utils |
| |
| import ( |
| "bytes" |
| "encoding/binary" |
| "io" |
| "math" |
| |
| "github.com/apache/arrow/go/v6/arrow/bitutil" |
| "github.com/apache/arrow/go/v6/parquet" |
| "golang.org/x/xerrors" |
| ) |
| |
| //go:generate go run ../../../arrow/_tools/tmpl/main.go -i -data=physical_types.tmpldata typed_rle_dict.gen.go.tmpl |
| |
| const ( |
| MaxValuesPerLiteralRun = (1 << 6) * 8 |
| ) |
| |
| func MinBufferSize(bitWidth int) int { |
| maxLiteralRunSize := 1 + bitutil.BytesForBits(int64(MaxValuesPerLiteralRun*bitWidth)) |
| maxRepeatedRunSize := binary.MaxVarintLen32 + bitutil.BytesForBits(int64(bitWidth)) |
| return int(Max(maxLiteralRunSize, maxRepeatedRunSize)) |
| } |
| |
| func MaxBufferSize(width, numValues int) int { |
| bytesPerRun := width |
| numRuns := int(bitutil.BytesForBits(int64(numValues))) |
| literalMaxSize := numRuns + (numRuns * bytesPerRun) |
| |
| minRepeatedRunSize := 1 + int(bitutil.BytesForBits(int64(width))) |
| repeatedMaxSize := int(bitutil.BytesForBits(int64(numValues))) * minRepeatedRunSize |
| |
| return MaxInt(literalMaxSize, repeatedMaxSize) |
| } |
| |
| // Utility classes to do run length encoding (RLE) for fixed bit width values. If runs |
| // are sufficiently long, RLE is used, otherwise, the values are just bit-packed |
| // (literal encoding). |
| // For both types of runs, there is a byte-aligned indicator which encodes the length |
| // of the run and the type of the run. |
| // This encoding has the benefit that when there aren't any long enough runs, values |
| // are always decoded at fixed (can be precomputed) bit offsets OR both the value and |
| // the run length are byte aligned. This allows for very efficient decoding |
| // implementations. |
| // The encoding is: |
| // encoded-block := run* |
| // run := literal-run | repeated-run |
| // literal-run := literal-indicator < literal bytes > |
| // repeated-run := repeated-indicator < repeated value. padded to byte boundary > |
| // literal-indicator := varint_encode( number_of_groups << 1 | 1) |
| // repeated-indicator := varint_encode( number_of_repetitions << 1 ) |
| // |
| // Each run is preceded by a varint. The varint's least significant bit is |
| // used to indicate whether the run is a literal run or a repeated run. The rest |
| // of the varint is used to determine the length of the run (eg how many times the |
| // value repeats). |
| // |
| // In the case of literal runs, the run length is always a multiple of 8 (i.e. encode |
| // in groups of 8), so that no matter the bit-width of the value, the sequence will end |
| // on a byte boundary without padding. |
| // Given that we know it is a multiple of 8, we store the number of 8-groups rather than |
| // the actual number of encoded ints. (This means that the total number of encoded values |
| // can not be determined from the encoded data, since the number of values in the last |
| // group may not be a multiple of 8). For the last group of literal runs, we pad |
| // the group to 8 with zeros. This allows for 8 at a time decoding on the read side |
| // without the need for additional checks. |
| // |
| // There is a break-even point when it is more storage efficient to do run length |
| // encoding. For 1 bit-width values, that point is 8 values. They require 2 bytes |
| // for both the repeated encoding or the literal encoding. This value can always |
| // be computed based on the bit-width. |
| // |
| // Examples with bit-width 1 (eg encoding booleans): |
| // ---------------------------------------- |
| // 100 1s followed by 100 0s: |
| // <varint(100 << 1)> <1, padded to 1 byte> <varint(100 << 1)> <0, padded to 1 byte> |
| // - (total 4 bytes) |
| // |
| // alternating 1s and 0s (200 total): |
| // 200 ints = 25 groups of 8 |
| // <varint((25 << 1) | 1)> <25 bytes of values, bitpacked> |
| // (total 26 bytes, 1 byte overhead) |
| // |
| |
| type RleDecoder struct { |
| r *BitReader |
| |
| bitWidth int |
| curVal uint64 |
| repCount int32 |
| litCount int32 |
| } |
| |
| func NewRleDecoder(data *bytes.Reader, width int) *RleDecoder { |
| return &RleDecoder{r: NewBitReader(data), bitWidth: width} |
| } |
| |
| func (r *RleDecoder) Reset(data *bytes.Reader, width int) { |
| r.bitWidth = width |
| r.curVal = 0 |
| r.repCount = 0 |
| r.litCount = 0 |
| r.r.Reset(data) |
| } |
| |
| func (r *RleDecoder) Next() bool { |
| indicator, ok := r.r.GetVlqInt() |
| if !ok { |
| return false |
| } |
| |
| literal := (indicator & 1) != 0 |
| count := uint32(indicator >> 1) |
| if literal { |
| if count == 0 || count > uint32(math.MaxInt32/8) { |
| return false |
| } |
| r.litCount = int32(count) * 8 |
| } else { |
| if count == 0 || count > uint32(math.MaxInt32) { |
| return false |
| } |
| r.repCount = int32(count) |
| |
| nbytes := int(bitutil.BytesForBits(int64(r.bitWidth))) |
| switch { |
| case nbytes > 4: |
| if !r.r.GetAligned(nbytes, &r.curVal) { |
| return false |
| } |
| case nbytes > 2: |
| var val uint32 |
| if !r.r.GetAligned(nbytes, &val) { |
| return false |
| } |
| r.curVal = uint64(val) |
| case nbytes > 1: |
| var val uint16 |
| if !r.r.GetAligned(nbytes, &val) { |
| return false |
| } |
| r.curVal = uint64(val) |
| default: |
| var val uint8 |
| if !r.r.GetAligned(nbytes, &val) { |
| return false |
| } |
| r.curVal = uint64(val) |
| } |
| } |
| return true |
| } |
| |
| func (r *RleDecoder) GetValue() (uint64, bool) { |
| vals := make([]uint64, 1) |
| n := r.GetBatch(vals) |
| return vals[0], n == 1 |
| } |
| |
| func (r *RleDecoder) GetBatch(values []uint64) int { |
| read := 0 |
| size := len(values) |
| |
| out := values |
| for read < size { |
| remain := size - read |
| |
| if r.repCount > 0 { |
| repbatch := int(math.Min(float64(remain), float64(r.repCount))) |
| for i := 0; i < repbatch; i++ { |
| out[i] = r.curVal |
| } |
| |
| r.repCount -= int32(repbatch) |
| read += repbatch |
| out = out[repbatch:] |
| } else if r.litCount > 0 { |
| litbatch := int(math.Min(float64(remain), float64(r.litCount))) |
| n, _ := r.r.GetBatch(uint(r.bitWidth), out[:litbatch]) |
| if n != litbatch { |
| return read |
| } |
| |
| r.litCount -= int32(litbatch) |
| read += litbatch |
| out = out[litbatch:] |
| } else { |
| if !r.Next() { |
| return read |
| } |
| } |
| } |
| return read |
| } |
| |
| func (r *RleDecoder) GetBatchSpaced(vals []uint64, nullcount int, validBits []byte, validBitsOffset int64) (int, error) { |
| if nullcount == 0 { |
| return r.GetBatch(vals), nil |
| } |
| |
| converter := plainConverter{} |
| blockCounter := NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals))) |
| |
| var ( |
| totalProcessed int |
| processed int |
| block BitBlockCount |
| err error |
| ) |
| |
| for { |
| block = blockCounter.NextFourWords() |
| if block.Len == 0 { |
| break |
| } |
| |
| if block.AllSet() { |
| processed = r.GetBatch(vals[:block.Len]) |
| } else if block.NoneSet() { |
| converter.FillZero(vals[:block.Len]) |
| processed = int(block.Len) |
| } else { |
| processed, err = r.getspaced(converter, vals, int(block.Len), int(block.Len-block.Popcnt), validBits, validBitsOffset) |
| if err != nil { |
| return totalProcessed, err |
| } |
| } |
| |
| totalProcessed += processed |
| vals = vals[int(block.Len):] |
| validBitsOffset += int64(block.Len) |
| |
| if processed != int(block.Len) { |
| break |
| } |
| } |
| return totalProcessed, nil |
| } |
| |
| func (r *RleDecoder) getspaced(dc DictionaryConverter, vals interface{}, batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { |
| switch vals := vals.(type) { |
| case []int32: |
| return r.getspacedInt32(dc, vals, batchSize, nullCount, validBits, validBitsOffset) |
| case []int64: |
| return r.getspacedInt64(dc, vals, batchSize, nullCount, validBits, validBitsOffset) |
| case []float32: |
| return r.getspacedFloat32(dc, vals, batchSize, nullCount, validBits, validBitsOffset) |
| case []float64: |
| return r.getspacedFloat64(dc, vals, batchSize, nullCount, validBits, validBitsOffset) |
| case []parquet.ByteArray: |
| return r.getspacedByteArray(dc, vals, batchSize, nullCount, validBits, validBitsOffset) |
| case []parquet.FixedLenByteArray: |
| return r.getspacedFixedLenByteArray(dc, vals, batchSize, nullCount, validBits, validBitsOffset) |
| case []parquet.Int96: |
| return r.getspacedInt96(dc, vals, batchSize, nullCount, validBits, validBitsOffset) |
| case []uint64: |
| return r.getspacedUint64(dc, vals, batchSize, nullCount, validBits, validBitsOffset) |
| default: |
| return 0, xerrors.New("parquet/rle: getspaced invalid type") |
| } |
| } |
| |
| func (r *RleDecoder) getspacedUint64(dc DictionaryConverter, vals []uint64, batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { |
| if nullCount == batchSize { |
| dc.FillZero(vals[:batchSize]) |
| return batchSize, nil |
| } |
| |
| read := 0 |
| remain := batchSize - nullCount |
| |
| const bufferSize = 1024 |
| var indexbuffer [bufferSize]IndexType |
| |
| // assume no bits to start |
| bitReader := NewBitRunReader(validBits, validBitsOffset, int64(batchSize)) |
| validRun := bitReader.NextRun() |
| for read < batchSize { |
| if validRun.Len == 0 { |
| validRun = bitReader.NextRun() |
| } |
| |
| if !validRun.Set { |
| dc.FillZero(vals[:int(validRun.Len)]) |
| vals = vals[int(validRun.Len):] |
| read += int(validRun.Len) |
| validRun.Len = 0 |
| continue |
| } |
| |
| if r.repCount == 0 && r.litCount == 0 { |
| if !r.Next() { |
| return read, nil |
| } |
| } |
| |
| var batch int |
| switch { |
| case r.repCount > 0: |
| batch, remain, validRun = r.consumeRepeatCounts(read, batchSize, remain, validRun, bitReader) |
| current := IndexType(r.curVal) |
| if !dc.IsValid(current) { |
| return read, nil |
| } |
| dc.Fill(vals[:batch], current) |
| case r.litCount > 0: |
| var ( |
| litread int |
| skipped int |
| err error |
| ) |
| litread, skipped, validRun, err = r.consumeLiteralsUint64(dc, vals, remain, indexbuffer[:], validRun, bitReader) |
| if err != nil { |
| return read, err |
| } |
| batch = litread + skipped |
| remain -= litread |
| } |
| |
| vals = vals[batch:] |
| read += batch |
| } |
| return read, nil |
| } |
| |
| func (r *RleDecoder) consumeRepeatCounts(read, batchSize, remain int, run BitRun, bitRdr BitRunReader) (int, int, BitRun) { |
| // Consume the entire repeat counts incrementing repeat_batch to |
| // be the total of nulls + values consumed, we only need to |
| // get the total count because we can fill in the same value for |
| // nulls and non-nulls. This proves to be a big efficiency win. |
| repeatBatch := 0 |
| for r.repCount > 0 && (read+repeatBatch) < batchSize { |
| if run.Set { |
| updateSize := int(Min(run.Len, int64(r.repCount))) |
| r.repCount -= int32(updateSize) |
| repeatBatch += updateSize |
| run.Len -= int64(updateSize) |
| remain -= updateSize |
| } else { |
| repeatBatch += int(run.Len) |
| run.Len = 0 |
| } |
| |
| if run.Len == 0 { |
| run = bitRdr.NextRun() |
| } |
| } |
| return repeatBatch, remain, run |
| } |
| |
| func (r *RleDecoder) consumeLiteralsUint64(dc DictionaryConverter, vals []uint64, remain int, buf []IndexType, run BitRun, bitRdr BitRunReader) (int, int, BitRun, error) { |
| batch := MinInt(MinInt(remain, int(r.litCount)), len(buf)) |
| buf = buf[:batch] |
| |
| n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf) |
| if n != batch { |
| return 0, 0, run, xerrors.New("was not able to retrieve correct number of indexes") |
| } |
| |
| if !dc.IsValid(buf...) { |
| return 0, 0, run, xerrors.New("invalid index values found for dictionary converter") |
| } |
| |
| var ( |
| read int |
| skipped int |
| ) |
| for read < batch { |
| if run.Set { |
| updateSize := MinInt(batch-read, int(run.Len)) |
| if err := dc.Copy(vals, buf[read:read+updateSize]); err != nil { |
| return 0, 0, run, err |
| } |
| read += updateSize |
| vals = vals[updateSize:] |
| run.Len -= int64(updateSize) |
| } else { |
| dc.FillZero(vals[:int(run.Len)]) |
| vals = vals[int(run.Len):] |
| skipped += int(run.Len) |
| run.Len = 0 |
| } |
| if run.Len == 0 { |
| run = bitRdr.NextRun() |
| } |
| } |
| r.litCount -= int32(batch) |
| return read, skipped, run, nil |
| } |
| |
| func (r *RleDecoder) GetBatchWithDict(dc DictionaryConverter, vals interface{}) (int, error) { |
| switch vals := vals.(type) { |
| case []int32: |
| return r.GetBatchWithDictInt32(dc, vals) |
| case []int64: |
| return r.GetBatchWithDictInt64(dc, vals) |
| case []float32: |
| return r.GetBatchWithDictFloat32(dc, vals) |
| case []float64: |
| return r.GetBatchWithDictFloat64(dc, vals) |
| case []parquet.ByteArray: |
| return r.GetBatchWithDictByteArray(dc, vals) |
| case []parquet.FixedLenByteArray: |
| return r.GetBatchWithDictFixedLenByteArray(dc, vals) |
| case []parquet.Int96: |
| return r.GetBatchWithDictInt96(dc, vals) |
| default: |
| return 0, xerrors.New("parquet/rle: GetBatchWithDict invalid type") |
| } |
| } |
| |
| func (r *RleDecoder) GetBatchWithDictSpaced(dc DictionaryConverter, vals interface{}, nullCount int, validBits []byte, validBitsOffset int64) (int, error) { |
| switch vals := vals.(type) { |
| case []int32: |
| return r.GetBatchWithDictSpacedInt32(dc, vals, nullCount, validBits, validBitsOffset) |
| case []int64: |
| return r.GetBatchWithDictSpacedInt64(dc, vals, nullCount, validBits, validBitsOffset) |
| case []float32: |
| return r.GetBatchWithDictSpacedFloat32(dc, vals, nullCount, validBits, validBitsOffset) |
| case []float64: |
| return r.GetBatchWithDictSpacedFloat64(dc, vals, nullCount, validBits, validBitsOffset) |
| case []parquet.ByteArray: |
| return r.GetBatchWithDictSpacedByteArray(dc, vals, nullCount, validBits, validBitsOffset) |
| case []parquet.FixedLenByteArray: |
| return r.GetBatchWithDictSpacedFixedLenByteArray(dc, vals, nullCount, validBits, validBitsOffset) |
| case []parquet.Int96: |
| return r.GetBatchWithDictSpacedInt96(dc, vals, nullCount, validBits, validBitsOffset) |
| default: |
| return 0, xerrors.New("parquet/rle: GetBatchWithDictSpaced invalid type") |
| } |
| } |
| |
| type RleEncoder struct { |
| w *BitWriter |
| |
| buffer []uint64 |
| BitWidth int |
| curVal uint64 |
| repCount int32 |
| litCount int32 |
| literalIndicatorOffset int |
| |
| indicatorBuffer [1]byte |
| } |
| |
| func NewRleEncoder(w io.WriterAt, width int) *RleEncoder { |
| return &RleEncoder{ |
| w: NewBitWriter(w), |
| buffer: make([]uint64, 0, 8), |
| BitWidth: width, |
| literalIndicatorOffset: -1, |
| } |
| } |
| |
| func (r *RleEncoder) Flush() int { |
| if r.litCount > 0 || r.repCount > 0 || len(r.buffer) > 0 { |
| allRep := r.litCount == 0 && (r.repCount == int32(len(r.buffer)) || len(r.buffer) == 0) |
| if r.repCount > 0 && allRep { |
| r.flushRepeated() |
| } else { |
| // buffer the last grou pof literals to 8 by padding with 0s |
| for len(r.buffer) != 0 && len(r.buffer) < 8 { |
| r.buffer = append(r.buffer, 0) |
| } |
| |
| r.litCount += int32(len(r.buffer)) |
| r.flushLiteral(true) |
| r.repCount = 0 |
| } |
| } |
| r.w.Flush(false) |
| return r.w.Written() |
| } |
| |
| func (r *RleEncoder) flushBuffered(done bool) (err error) { |
| if r.repCount >= 8 { |
| // clear buffered values. they are part of the repeated run now and we |
| // don't want to flush them as literals |
| r.buffer = r.buffer[:0] |
| if r.litCount != 0 { |
| // there was current literal run. all values flushed but need to update the indicator |
| err = r.flushLiteral(true) |
| } |
| return |
| } |
| |
| r.litCount += int32(len(r.buffer)) |
| ngroups := r.litCount / 8 |
| if ngroups+1 >= (1 << 6) { |
| // we need to start a new literal run because the indicator byte we've reserved |
| // cannot store any more values |
| err = r.flushLiteral(true) |
| } else { |
| err = r.flushLiteral(done) |
| } |
| r.repCount = 0 |
| return |
| } |
| |
| func (r *RleEncoder) flushLiteral(updateIndicator bool) (err error) { |
| if r.literalIndicatorOffset == -1 { |
| r.literalIndicatorOffset = r.w.ReserveBytes(1) |
| } |
| |
| for _, val := range r.buffer { |
| if err = r.w.WriteValue(val, uint(r.BitWidth)); err != nil { |
| return |
| } |
| } |
| r.buffer = r.buffer[:0] |
| |
| if updateIndicator { |
| // at this point we need to write the indicator byte for the literal run. |
| // we only reserve one byte, to allow for streaming writes of literal values. |
| // the logic makes sure we flush literal runs often enough to not overrun the 1 byte. |
| ngroups := r.litCount / 8 |
| r.indicatorBuffer[0] = byte((ngroups << 1) | 1) |
| _, err = r.w.WriteAt(r.indicatorBuffer[:], int64(r.literalIndicatorOffset)) |
| r.literalIndicatorOffset = -1 |
| r.litCount = 0 |
| } |
| return |
| } |
| |
| func (r *RleEncoder) flushRepeated() (ret bool) { |
| indicator := r.repCount << 1 |
| |
| ret = r.w.WriteVlqInt(uint64(indicator)) |
| ret = ret && r.w.WriteAligned(r.curVal, int(bitutil.BytesForBits(int64(r.BitWidth)))) |
| |
| r.repCount = 0 |
| r.buffer = r.buffer[:0] |
| return |
| } |
| |
| // Put buffers input values 8 at a time. after seeing all 8 values, |
| // it decides whether they should be encoded as a literal or repeated run. |
| func (r *RleEncoder) Put(value uint64) error { |
| if r.curVal == value { |
| r.repCount++ |
| if r.repCount > 8 { |
| // this is just a continuation of the current run, no need to buffer the values |
| // NOTE this is the fast path for long repeated runs |
| return nil |
| } |
| } else { |
| if r.repCount >= 8 { |
| if !r.flushRepeated() { |
| return xerrors.New("failed to flush repeated value") |
| } |
| } |
| r.repCount = 1 |
| r.curVal = value |
| } |
| |
| r.buffer = append(r.buffer, value) |
| if len(r.buffer) == 8 { |
| return r.flushBuffered(false) |
| } |
| return nil |
| } |
| |
| func (r *RleEncoder) Clear() { |
| r.curVal = 0 |
| r.repCount = 0 |
| r.buffer = r.buffer[:0] |
| r.litCount = 0 |
| r.literalIndicatorOffset = -1 |
| r.w.Clear() |
| } |