| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package utils |
| |
| import ( |
| "encoding/binary" |
| "errors" |
| "io" |
| "math" |
| "reflect" |
| "unsafe" |
| |
| "github.com/apache/arrow/go/v6/arrow" |
| "github.com/apache/arrow/go/v6/arrow/bitutil" |
| "github.com/apache/arrow/go/v6/arrow/memory" |
| ) |
| |
| // masks for grabbing the trailing bits based on the number of trailing bits desired |
| var trailingMask [64]uint64 |
| |
| func init() { |
| // generate the masks at init so we don't have to hard code them. |
| for i := 0; i < 64; i++ { |
| trailingMask[i] = (math.MaxUint64 >> (64 - i)) |
| } |
| } |
| |
| // trailingBits returns a value constructed from the bits trailing bits of |
| // the value v that is passed in. If bits >= 64, then we just return v. |
| func trailingBits(v uint64, bits uint) uint64 { |
| if bits >= 64 { |
| return v |
| } |
| return v & trailingMask[bits] |
| } |
| |
| // reader is a useful interface to define the functionality we need for implementation |
| type reader interface { |
| io.Reader |
| io.ReaderAt |
| io.Seeker |
| } |
| |
| // default buffer length |
| const buflen = 1024 |
| |
| // BitReader implements functionality for reading bits or bytes buffering up to a uint64 |
| // at a time from the reader in order to improve efficiency. It also provides |
| // methods to read multiple bytes in one read such as encoded ints/values. |
| // |
| // This BitReader is the basis for the other utility classes like RLE decoding |
| // and such, providing the necessary functions for interpreting the values. |
| type BitReader struct { |
| reader reader |
| buffer uint64 |
| byteoffset int64 |
| bitoffset uint |
| raw [8]byte |
| |
| unpackBuf [buflen]uint32 |
| } |
| |
| // NewBitReader takes in a reader that implements io.Reader, io.ReaderAt and io.Seeker |
| // interfaces and returns a BitReader for use with various bit level manipulations. |
| func NewBitReader(r reader) *BitReader { |
| return &BitReader{reader: r} |
| } |
| |
| // CurOffset returns the current Byte offset into the data that the reader is at. |
| func (b *BitReader) CurOffset() int64 { |
| return b.byteoffset + bitutil.BytesForBits(int64(b.bitoffset)) |
| } |
| |
| // Reset allows reusing a BitReader by setting a new reader and resetting the internal |
| // state back to zeros. |
| func (b *BitReader) Reset(r reader) { |
| b.reader = r |
| b.buffer = 0 |
| b.byteoffset = 0 |
| b.bitoffset = 0 |
| } |
| |
| // GetVlqInt reads a Vlq encoded int from the stream. The encoded value must start |
| // at the beginning of a byte and this returns false if there weren't enough bytes |
| // in the buffer or reader. This will call `ReadByte` which in turn retrieves byte |
| // aligned values from the reader |
| func (b *BitReader) GetVlqInt() (uint64, bool) { |
| tmp, err := binary.ReadUvarint(b) |
| if err != nil { |
| return 0, false |
| } |
| return tmp, true |
| } |
| |
| // GetZigZagVlqInt reads a zigzag encoded integer, returning false if there weren't |
| // enough bytes remaining. |
| func (b *BitReader) GetZigZagVlqInt() (int64, bool) { |
| u, ok := b.GetVlqInt() |
| if !ok { |
| return 0, false |
| } |
| |
| return int64(u>>1) ^ -int64(u&1), true |
| } |
| |
| // ReadByte reads a single aligned byte from the underlying stream, or populating |
| // error if there aren't enough bytes left. |
| func (b *BitReader) ReadByte() (byte, error) { |
| var tmp byte |
| if ok := b.GetAligned(1, &tmp); !ok { |
| return 0, errors.New("failed to read byte") |
| } |
| |
| return tmp, nil |
| } |
| |
| // GetAligned reads nbytes from the underlying stream into the passed interface value. |
| // Returning false if there aren't enough bytes remaining in the stream or if an invalid |
| // type is passed. The bytes are read aligned to byte boundaries. |
| // |
| // v must be a pointer to a byte or sized uint type (*byte, *uint16, *uint32, *uint64). |
| // encoded values are assumed to be little endian. |
| func (b *BitReader) GetAligned(nbytes int, v interface{}) bool { |
| // figure out the number of bytes to represent v |
| typBytes := int(reflect.TypeOf(v).Elem().Size()) |
| if nbytes > typBytes { |
| return false |
| } |
| |
| bread := bitutil.BytesForBits(int64(b.bitoffset)) |
| |
| b.byteoffset += bread |
| n, err := b.reader.ReadAt(b.raw[:nbytes], b.byteoffset) |
| if err != nil && err != io.EOF { |
| return false |
| } |
| if n != nbytes { |
| return false |
| } |
| // zero pad the the bytes |
| memory.Set(b.raw[n:typBytes], 0) |
| |
| switch v := v.(type) { |
| case *byte: |
| *v = b.raw[0] |
| case *uint64: |
| *v = binary.LittleEndian.Uint64(b.raw[:typBytes]) |
| case *uint32: |
| *v = binary.LittleEndian.Uint32(b.raw[:typBytes]) |
| case *uint16: |
| *v = binary.LittleEndian.Uint16(b.raw[:typBytes]) |
| default: |
| return false |
| } |
| |
| b.byteoffset += int64(nbytes) |
| |
| b.bitoffset = 0 |
| b.fillbuffer() |
| return true |
| } |
| |
| // fillbuffer fills the uint64 buffer with bytes from the underlying stream |
| func (b *BitReader) fillbuffer() error { |
| n, err := b.reader.ReadAt(b.raw[:], b.byteoffset) |
| if err != nil && n == 0 && err != io.EOF { |
| return err |
| } |
| for i := n; i < 8; i++ { |
| b.raw[i] = 0 |
| } |
| b.buffer = binary.LittleEndian.Uint64(b.raw[:]) |
| return nil |
| } |
| |
| // next reads an integral value from the next bits in the buffer |
| func (b *BitReader) next(bits uint) (v uint64, err error) { |
| v = trailingBits(b.buffer, b.bitoffset+bits) >> b.bitoffset |
| b.bitoffset += bits |
| // if we need more bits to get what was requested then refill the buffer |
| if b.bitoffset >= 64 { |
| b.byteoffset += 8 |
| b.bitoffset -= 64 |
| if err = b.fillbuffer(); err != nil { |
| return 0, err |
| } |
| v |= trailingBits(b.buffer, b.bitoffset) << (bits - b.bitoffset) |
| } |
| return |
| } |
| |
| // GetBatchIndex is like GetBatch but for IndexType (used for dictionary decoding) |
| func (b *BitReader) GetBatchIndex(bits uint, out []IndexType) (i int, err error) { |
| // IndexType is a 32-bit value so bits must be less than 32 when unpacking |
| // values using the bitreader. |
| if bits > 32 { |
| return 0, errors.New("must be 32 bits or less per read") |
| } |
| |
| var val uint64 |
| |
| length := len(out) |
| // if we're not currently byte-aligned, read bits until we are byte-aligned. |
| for ; i < length && b.bitoffset != 0; i++ { |
| val, err = b.next(bits) |
| out[i] = IndexType(val) |
| if err != nil { |
| return |
| } |
| } |
| |
| b.reader.Seek(b.byteoffset, io.SeekStart) |
| // grab as many 32 byte chunks as possible in one shot |
| if i < length { // IndexType should be a 32 bit value so we can do quick unpacking right into the output |
| numUnpacked := unpack32(b.reader, (*(*[]uint32)(unsafe.Pointer(&out)))[i:], int(bits)) |
| i += numUnpacked |
| b.byteoffset += int64(numUnpacked * int(bits) / 8) |
| } |
| |
| // re-fill our buffer just in case. |
| b.fillbuffer() |
| // grab the remaining values that aren't 32 byte aligned |
| for ; i < length; i++ { |
| val, err = b.next(bits) |
| out[i] = IndexType(val) |
| if err != nil { |
| break |
| } |
| } |
| return |
| } |
| |
| // GetBatchBools is like GetBatch but optimized for reading bits as boolean values |
| func (b *BitReader) GetBatchBools(out []bool) (int, error) { |
| bits := uint(1) |
| length := len(out) |
| |
| i := 0 |
| // read until we are byte-aligned |
| for ; i < length && b.bitoffset != 0; i++ { |
| val, err := b.next(bits) |
| out[i] = val != 0 |
| if err != nil { |
| return i, err |
| } |
| } |
| |
| b.reader.Seek(b.byteoffset, io.SeekStart) |
| buf := arrow.Uint32Traits.CastToBytes(b.unpackBuf[:]) |
| blen := buflen * 8 |
| for i < length { |
| // grab byte-aligned bits in a loop since it's more efficient than going |
| // bit by bit when you can grab 8 bools at a time. |
| unpackSize := MinInt(blen, length-i) / 8 * 8 |
| n, err := b.reader.Read(buf[:bitutil.BytesForBits(int64(unpackSize))]) |
| if err != nil { |
| return i, err |
| } |
| BytesToBools(buf[:n], out[i:]) |
| i += unpackSize |
| b.byteoffset += int64(n) |
| } |
| |
| b.fillbuffer() |
| // grab the trailing bits |
| for ; i < length; i++ { |
| val, err := b.next(bits) |
| out[i] = val != 0 |
| if err != nil { |
| return i, err |
| } |
| } |
| |
| return i, nil |
| } |
| |
| // GetBatch fills out by decoding values repeated from the stream that are encoded |
| // using bits as the number of bits per value. The values are expected to be bit packed |
| // so we will unpack the values to populate. |
| func (b *BitReader) GetBatch(bits uint, out []uint64) (int, error) { |
| // since we're unpacking into uint64 values, we can't support bits being |
| // larger than 64 here as that's the largest size value we're reading |
| if bits > 64 { |
| return 0, errors.New("must be 64 bits or less per read") |
| } |
| |
| length := len(out) |
| |
| i := 0 |
| // read until we are byte aligned |
| for ; i < length && b.bitoffset != 0; i++ { |
| val, err := b.next(bits) |
| out[i] = val |
| if err != nil { |
| return i, err |
| } |
| } |
| |
| b.reader.Seek(b.byteoffset, io.SeekStart) |
| for i < length { |
| // unpack groups of 32 bytes at a time into a buffer since it's more efficient |
| unpackSize := MinInt(buflen, length-i) |
| numUnpacked := unpack32(b.reader, b.unpackBuf[:unpackSize], int(bits)) |
| if numUnpacked == 0 { |
| break |
| } |
| |
| for k := 0; k < numUnpacked; k++ { |
| out[i+k] = uint64(b.unpackBuf[k]) |
| } |
| i += numUnpacked |
| b.byteoffset += int64(numUnpacked * int(bits) / 8) |
| } |
| |
| b.fillbuffer() |
| // and then the remaining trailing values |
| for ; i < length; i++ { |
| val, err := b.next(bits) |
| out[i] = val |
| if err != nil { |
| return i, err |
| } |
| } |
| |
| return i, nil |
| } |
| |
| // GetValue returns a single value that is bit packed using width as the number of bits |
| // and returns false if there weren't enough bits remaining. |
| func (b *BitReader) GetValue(width int) (uint64, bool) { |
| v := make([]uint64, 1) |
| n, _ := b.GetBatch(uint(width), v) |
| return v[0], n == 1 |
| } |