blob: c09a13e7c5782e513cbcb1cf3fdcad008e1ce145 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils
import (
"encoding/binary"
"errors"
"io"
"math"
"reflect"
"unsafe"
"github.com/apache/arrow/go/v6/arrow"
"github.com/apache/arrow/go/v6/arrow/bitutil"
"github.com/apache/arrow/go/v6/arrow/memory"
)
// masks for grabbing the trailing bits based on the number of trailing bits desired
var trailingMask [64]uint64
func init() {
// generate the masks at init so we don't have to hard code them.
for i := 0; i < 64; i++ {
trailingMask[i] = (math.MaxUint64 >> (64 - i))
}
}
// trailingBits returns a value constructed from the bits trailing bits of
// the value v that is passed in. If bits >= 64, then we just return v.
func trailingBits(v uint64, bits uint) uint64 {
if bits >= 64 {
return v
}
return v & trailingMask[bits]
}
// reader is a useful interface to define the functionality we need for implementation
type reader interface {
io.Reader
io.ReaderAt
io.Seeker
}
// default buffer length
const buflen = 1024
// BitReader implements functionality for reading bits or bytes buffering up to a uint64
// at a time from the reader in order to improve efficiency. It also provides
// methods to read multiple bytes in one read such as encoded ints/values.
//
// This BitReader is the basis for the other utility classes like RLE decoding
// and such, providing the necessary functions for interpreting the values.
type BitReader struct {
reader reader
buffer uint64
byteoffset int64
bitoffset uint
raw [8]byte
unpackBuf [buflen]uint32
}
// NewBitReader takes in a reader that implements io.Reader, io.ReaderAt and io.Seeker
// interfaces and returns a BitReader for use with various bit level manipulations.
func NewBitReader(r reader) *BitReader {
return &BitReader{reader: r}
}
// CurOffset returns the current Byte offset into the data that the reader is at.
func (b *BitReader) CurOffset() int64 {
return b.byteoffset + bitutil.BytesForBits(int64(b.bitoffset))
}
// Reset allows reusing a BitReader by setting a new reader and resetting the internal
// state back to zeros.
func (b *BitReader) Reset(r reader) {
b.reader = r
b.buffer = 0
b.byteoffset = 0
b.bitoffset = 0
}
// GetVlqInt reads a Vlq encoded int from the stream. The encoded value must start
// at the beginning of a byte and this returns false if there weren't enough bytes
// in the buffer or reader. This will call `ReadByte` which in turn retrieves byte
// aligned values from the reader
func (b *BitReader) GetVlqInt() (uint64, bool) {
tmp, err := binary.ReadUvarint(b)
if err != nil {
return 0, false
}
return tmp, true
}
// GetZigZagVlqInt reads a zigzag encoded integer, returning false if there weren't
// enough bytes remaining.
func (b *BitReader) GetZigZagVlqInt() (int64, bool) {
u, ok := b.GetVlqInt()
if !ok {
return 0, false
}
return int64(u>>1) ^ -int64(u&1), true
}
// ReadByte reads a single aligned byte from the underlying stream, or populating
// error if there aren't enough bytes left.
func (b *BitReader) ReadByte() (byte, error) {
var tmp byte
if ok := b.GetAligned(1, &tmp); !ok {
return 0, errors.New("failed to read byte")
}
return tmp, nil
}
// GetAligned reads nbytes from the underlying stream into the passed interface value.
// Returning false if there aren't enough bytes remaining in the stream or if an invalid
// type is passed. The bytes are read aligned to byte boundaries.
//
// v must be a pointer to a byte or sized uint type (*byte, *uint16, *uint32, *uint64).
// encoded values are assumed to be little endian.
func (b *BitReader) GetAligned(nbytes int, v interface{}) bool {
// figure out the number of bytes to represent v
typBytes := int(reflect.TypeOf(v).Elem().Size())
if nbytes > typBytes {
return false
}
bread := bitutil.BytesForBits(int64(b.bitoffset))
b.byteoffset += bread
n, err := b.reader.ReadAt(b.raw[:nbytes], b.byteoffset)
if err != nil && err != io.EOF {
return false
}
if n != nbytes {
return false
}
// zero pad the the bytes
memory.Set(b.raw[n:typBytes], 0)
switch v := v.(type) {
case *byte:
*v = b.raw[0]
case *uint64:
*v = binary.LittleEndian.Uint64(b.raw[:typBytes])
case *uint32:
*v = binary.LittleEndian.Uint32(b.raw[:typBytes])
case *uint16:
*v = binary.LittleEndian.Uint16(b.raw[:typBytes])
default:
return false
}
b.byteoffset += int64(nbytes)
b.bitoffset = 0
b.fillbuffer()
return true
}
// fillbuffer fills the uint64 buffer with bytes from the underlying stream
func (b *BitReader) fillbuffer() error {
n, err := b.reader.ReadAt(b.raw[:], b.byteoffset)
if err != nil && n == 0 && err != io.EOF {
return err
}
for i := n; i < 8; i++ {
b.raw[i] = 0
}
b.buffer = binary.LittleEndian.Uint64(b.raw[:])
return nil
}
// next reads an integral value from the next bits in the buffer
func (b *BitReader) next(bits uint) (v uint64, err error) {
v = trailingBits(b.buffer, b.bitoffset+bits) >> b.bitoffset
b.bitoffset += bits
// if we need more bits to get what was requested then refill the buffer
if b.bitoffset >= 64 {
b.byteoffset += 8
b.bitoffset -= 64
if err = b.fillbuffer(); err != nil {
return 0, err
}
v |= trailingBits(b.buffer, b.bitoffset) << (bits - b.bitoffset)
}
return
}
// GetBatchIndex is like GetBatch but for IndexType (used for dictionary decoding)
func (b *BitReader) GetBatchIndex(bits uint, out []IndexType) (i int, err error) {
// IndexType is a 32-bit value so bits must be less than 32 when unpacking
// values using the bitreader.
if bits > 32 {
return 0, errors.New("must be 32 bits or less per read")
}
var val uint64
length := len(out)
// if we're not currently byte-aligned, read bits until we are byte-aligned.
for ; i < length && b.bitoffset != 0; i++ {
val, err = b.next(bits)
out[i] = IndexType(val)
if err != nil {
return
}
}
b.reader.Seek(b.byteoffset, io.SeekStart)
// grab as many 32 byte chunks as possible in one shot
if i < length { // IndexType should be a 32 bit value so we can do quick unpacking right into the output
numUnpacked := unpack32(b.reader, (*(*[]uint32)(unsafe.Pointer(&out)))[i:], int(bits))
i += numUnpacked
b.byteoffset += int64(numUnpacked * int(bits) / 8)
}
// re-fill our buffer just in case.
b.fillbuffer()
// grab the remaining values that aren't 32 byte aligned
for ; i < length; i++ {
val, err = b.next(bits)
out[i] = IndexType(val)
if err != nil {
break
}
}
return
}
// GetBatchBools is like GetBatch but optimized for reading bits as boolean values
func (b *BitReader) GetBatchBools(out []bool) (int, error) {
bits := uint(1)
length := len(out)
i := 0
// read until we are byte-aligned
for ; i < length && b.bitoffset != 0; i++ {
val, err := b.next(bits)
out[i] = val != 0
if err != nil {
return i, err
}
}
b.reader.Seek(b.byteoffset, io.SeekStart)
buf := arrow.Uint32Traits.CastToBytes(b.unpackBuf[:])
blen := buflen * 8
for i < length {
// grab byte-aligned bits in a loop since it's more efficient than going
// bit by bit when you can grab 8 bools at a time.
unpackSize := MinInt(blen, length-i) / 8 * 8
n, err := b.reader.Read(buf[:bitutil.BytesForBits(int64(unpackSize))])
if err != nil {
return i, err
}
BytesToBools(buf[:n], out[i:])
i += unpackSize
b.byteoffset += int64(n)
}
b.fillbuffer()
// grab the trailing bits
for ; i < length; i++ {
val, err := b.next(bits)
out[i] = val != 0
if err != nil {
return i, err
}
}
return i, nil
}
// GetBatch fills out by decoding values repeated from the stream that are encoded
// using bits as the number of bits per value. The values are expected to be bit packed
// so we will unpack the values to populate.
func (b *BitReader) GetBatch(bits uint, out []uint64) (int, error) {
// since we're unpacking into uint64 values, we can't support bits being
// larger than 64 here as that's the largest size value we're reading
if bits > 64 {
return 0, errors.New("must be 64 bits or less per read")
}
length := len(out)
i := 0
// read until we are byte aligned
for ; i < length && b.bitoffset != 0; i++ {
val, err := b.next(bits)
out[i] = val
if err != nil {
return i, err
}
}
b.reader.Seek(b.byteoffset, io.SeekStart)
for i < length {
// unpack groups of 32 bytes at a time into a buffer since it's more efficient
unpackSize := MinInt(buflen, length-i)
numUnpacked := unpack32(b.reader, b.unpackBuf[:unpackSize], int(bits))
if numUnpacked == 0 {
break
}
for k := 0; k < numUnpacked; k++ {
out[i+k] = uint64(b.unpackBuf[k])
}
i += numUnpacked
b.byteoffset += int64(numUnpacked * int(bits) / 8)
}
b.fillbuffer()
// and then the remaining trailing values
for ; i < length; i++ {
val, err := b.next(bits)
out[i] = val
if err != nil {
return i, err
}
}
return i, nil
}
// GetValue returns a single value that is bit packed using width as the number of bits
// and returns false if there weren't enough bits remaining.
func (b *BitReader) GetValue(width int) (uint64, bool) {
v := make([]uint64, 1)
n, _ := b.GetBatch(uint(width), v)
return v[0], n == 1
}