blob: fb6eac373134d632a827e007c242dc4c13b9eaed [file] [log] [blame]
// Code generated by typed_rle_dict.gen.go.tmpl. DO NOT EDIT.
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils
import (
"github.com/apache/arrow/go/v6/parquet"
"golang.org/x/xerrors"
)
func (r *RleDecoder) GetBatchWithDictSpacedInt32(dc DictionaryConverter, vals []int32, nullCount int, validBits []byte, validBitsOffset int64) (totalProcessed int, err error) {
if nullCount == 0 {
return r.GetBatchWithDictInt32(dc, vals)
}
var (
blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
processed = 0
block BitBlockCount
)
for {
block = blockCounter.NextFourWords()
if block.Len == 0 {
break
}
switch {
case block.AllSet():
processed, err = r.GetBatchWithDictInt32(dc, vals[:block.Len])
case block.NoneSet():
dc.FillZero(vals[:block.Len])
processed = int(block.Len)
default:
processed, err = r.getspacedInt32(dc, vals, int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
}
if err != nil {
break
}
totalProcessed += processed
vals = vals[int(block.Len):]
validBitsOffset += int64(block.Len)
if processed != int(block.Len) {
break
}
}
return
}
func (r *RleDecoder) getspacedInt32(dc DictionaryConverter, vals []int32, batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
if nullCount == batchSize {
dc.FillZero(vals[:batchSize])
return batchSize, nil
}
read := 0
remain := batchSize - nullCount
const bufferSize = 1024
var indexbuffer [bufferSize]IndexType
// assume no bits to start
bitReader := NewBitRunReader(validBits, validBitsOffset, int64(batchSize))
validRun := bitReader.NextRun()
for read < batchSize {
if validRun.Len == 0 {
validRun = bitReader.NextRun()
}
if !validRun.Set {
dc.FillZero(vals[:int(validRun.Len)])
vals = vals[int(validRun.Len):]
read += int(validRun.Len)
validRun.Len = 0
continue
}
if r.repCount == 0 && r.litCount == 0 {
if !r.Next() {
return read, nil
}
}
var batch int
switch {
case r.repCount > 0:
batch, remain, validRun = r.consumeRepeatCounts(read, batchSize, remain, validRun, bitReader)
current := IndexType(r.curVal)
if !dc.IsValid(current) {
return read, nil
}
dc.Fill(vals[:batch], current)
case r.litCount > 0:
var (
litread int
skipped int
err error
)
litread, skipped, validRun, err = r.consumeLiteralsInt32(dc, vals, remain, indexbuffer[:], validRun, bitReader)
if err != nil {
return read, err
}
batch = litread + skipped
remain -= litread
}
vals = vals[batch:]
read += batch
}
return read, nil
}
func (r *RleDecoder) consumeLiteralsInt32(dc DictionaryConverter, vals []int32, remain int, buf []IndexType, run BitRun, bitRdr BitRunReader) (int, int, BitRun, error) {
batch := MinInt(MinInt(remain, int(r.litCount)), len(buf))
buf = buf[:batch]
n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
if n != batch {
return 0, 0, run, xerrors.New("was not able to retrieve correct number of indexes")
}
if !dc.IsValid(buf...) {
return 0, 0, run, xerrors.New("invalid index values found for dictionary converter")
}
var (
read int
skipped int
)
for read < batch {
if run.Set {
updateSize := MinInt(batch-read, int(run.Len))
if err := dc.Copy(vals, buf[read:read+updateSize]); err != nil {
return 0, 0, run, err
}
read += updateSize
vals = vals[updateSize:]
run.Len -= int64(updateSize)
} else {
dc.FillZero(vals[:int(run.Len)])
vals = vals[int(run.Len):]
skipped += int(run.Len)
run.Len = 0
}
if run.Len == 0 {
run = bitRdr.NextRun()
}
}
r.litCount -= int32(batch)
return read, skipped, run, nil
}
func (r *RleDecoder) GetBatchWithDictInt32(dc DictionaryConverter, vals []int32) (int, error) {
var (
read = 0
size = len(vals)
indexbuffer [1024]IndexType
)
for read < size {
remain := size - read
switch {
case r.repCount > 0:
idx := IndexType(r.curVal)
if !dc.IsValid(idx) {
return read, nil
}
batch := MinInt(remain, int(r.repCount))
if err := dc.Fill(vals[:batch], idx); err != nil {
return read, err
}
r.repCount -= int32(batch)
read += batch
vals = vals[batch:]
case r.litCount > 0:
litbatch := MinInt(MinInt(remain, int(r.litCount)), 1024)
buf := indexbuffer[:litbatch]
n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
if n != litbatch {
return read, nil
}
if !dc.IsValid(buf...) {
return read, nil
}
if err := dc.Copy(vals, buf); err != nil {
return read, nil
}
r.litCount -= int32(litbatch)
read += litbatch
vals = vals[litbatch:]
default:
if !r.Next() {
return read, nil
}
}
}
return read, nil
}
func (r *RleDecoder) GetBatchWithDictSpacedInt64(dc DictionaryConverter, vals []int64, nullCount int, validBits []byte, validBitsOffset int64) (totalProcessed int, err error) {
if nullCount == 0 {
return r.GetBatchWithDictInt64(dc, vals)
}
var (
blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
processed = 0
block BitBlockCount
)
for {
block = blockCounter.NextFourWords()
if block.Len == 0 {
break
}
switch {
case block.AllSet():
processed, err = r.GetBatchWithDictInt64(dc, vals[:block.Len])
case block.NoneSet():
dc.FillZero(vals[:block.Len])
processed = int(block.Len)
default:
processed, err = r.getspacedInt64(dc, vals, int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
}
if err != nil {
break
}
totalProcessed += processed
vals = vals[int(block.Len):]
validBitsOffset += int64(block.Len)
if processed != int(block.Len) {
break
}
}
return
}
func (r *RleDecoder) getspacedInt64(dc DictionaryConverter, vals []int64, batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
if nullCount == batchSize {
dc.FillZero(vals[:batchSize])
return batchSize, nil
}
read := 0
remain := batchSize - nullCount
const bufferSize = 1024
var indexbuffer [bufferSize]IndexType
// assume no bits to start
bitReader := NewBitRunReader(validBits, validBitsOffset, int64(batchSize))
validRun := bitReader.NextRun()
for read < batchSize {
if validRun.Len == 0 {
validRun = bitReader.NextRun()
}
if !validRun.Set {
dc.FillZero(vals[:int(validRun.Len)])
vals = vals[int(validRun.Len):]
read += int(validRun.Len)
validRun.Len = 0
continue
}
if r.repCount == 0 && r.litCount == 0 {
if !r.Next() {
return read, nil
}
}
var batch int
switch {
case r.repCount > 0:
batch, remain, validRun = r.consumeRepeatCounts(read, batchSize, remain, validRun, bitReader)
current := IndexType(r.curVal)
if !dc.IsValid(current) {
return read, nil
}
dc.Fill(vals[:batch], current)
case r.litCount > 0:
var (
litread int
skipped int
err error
)
litread, skipped, validRun, err = r.consumeLiteralsInt64(dc, vals, remain, indexbuffer[:], validRun, bitReader)
if err != nil {
return read, err
}
batch = litread + skipped
remain -= litread
}
vals = vals[batch:]
read += batch
}
return read, nil
}
func (r *RleDecoder) consumeLiteralsInt64(dc DictionaryConverter, vals []int64, remain int, buf []IndexType, run BitRun, bitRdr BitRunReader) (int, int, BitRun, error) {
batch := MinInt(MinInt(remain, int(r.litCount)), len(buf))
buf = buf[:batch]
n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
if n != batch {
return 0, 0, run, xerrors.New("was not able to retrieve correct number of indexes")
}
if !dc.IsValid(buf...) {
return 0, 0, run, xerrors.New("invalid index values found for dictionary converter")
}
var (
read int
skipped int
)
for read < batch {
if run.Set {
updateSize := MinInt(batch-read, int(run.Len))
if err := dc.Copy(vals, buf[read:read+updateSize]); err != nil {
return 0, 0, run, err
}
read += updateSize
vals = vals[updateSize:]
run.Len -= int64(updateSize)
} else {
dc.FillZero(vals[:int(run.Len)])
vals = vals[int(run.Len):]
skipped += int(run.Len)
run.Len = 0
}
if run.Len == 0 {
run = bitRdr.NextRun()
}
}
r.litCount -= int32(batch)
return read, skipped, run, nil
}
func (r *RleDecoder) GetBatchWithDictInt64(dc DictionaryConverter, vals []int64) (int, error) {
var (
read = 0
size = len(vals)
indexbuffer [1024]IndexType
)
for read < size {
remain := size - read
switch {
case r.repCount > 0:
idx := IndexType(r.curVal)
if !dc.IsValid(idx) {
return read, nil
}
batch := MinInt(remain, int(r.repCount))
if err := dc.Fill(vals[:batch], idx); err != nil {
return read, err
}
r.repCount -= int32(batch)
read += batch
vals = vals[batch:]
case r.litCount > 0:
litbatch := MinInt(MinInt(remain, int(r.litCount)), 1024)
buf := indexbuffer[:litbatch]
n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
if n != litbatch {
return read, nil
}
if !dc.IsValid(buf...) {
return read, nil
}
if err := dc.Copy(vals, buf); err != nil {
return read, nil
}
r.litCount -= int32(litbatch)
read += litbatch
vals = vals[litbatch:]
default:
if !r.Next() {
return read, nil
}
}
}
return read, nil
}
func (r *RleDecoder) GetBatchWithDictSpacedInt96(dc DictionaryConverter, vals []parquet.Int96, nullCount int, validBits []byte, validBitsOffset int64) (totalProcessed int, err error) {
if nullCount == 0 {
return r.GetBatchWithDictInt96(dc, vals)
}
var (
blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
processed = 0
block BitBlockCount
)
for {
block = blockCounter.NextFourWords()
if block.Len == 0 {
break
}
switch {
case block.AllSet():
processed, err = r.GetBatchWithDictInt96(dc, vals[:block.Len])
case block.NoneSet():
dc.FillZero(vals[:block.Len])
processed = int(block.Len)
default:
processed, err = r.getspacedInt96(dc, vals, int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
}
if err != nil {
break
}
totalProcessed += processed
vals = vals[int(block.Len):]
validBitsOffset += int64(block.Len)
if processed != int(block.Len) {
break
}
}
return
}
func (r *RleDecoder) getspacedInt96(dc DictionaryConverter, vals []parquet.Int96, batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
if nullCount == batchSize {
dc.FillZero(vals[:batchSize])
return batchSize, nil
}
read := 0
remain := batchSize - nullCount
const bufferSize = 1024
var indexbuffer [bufferSize]IndexType
// assume no bits to start
bitReader := NewBitRunReader(validBits, validBitsOffset, int64(batchSize))
validRun := bitReader.NextRun()
for read < batchSize {
if validRun.Len == 0 {
validRun = bitReader.NextRun()
}
if !validRun.Set {
dc.FillZero(vals[:int(validRun.Len)])
vals = vals[int(validRun.Len):]
read += int(validRun.Len)
validRun.Len = 0
continue
}
if r.repCount == 0 && r.litCount == 0 {
if !r.Next() {
return read, nil
}
}
var batch int
switch {
case r.repCount > 0:
batch, remain, validRun = r.consumeRepeatCounts(read, batchSize, remain, validRun, bitReader)
current := IndexType(r.curVal)
if !dc.IsValid(current) {
return read, nil
}
dc.Fill(vals[:batch], current)
case r.litCount > 0:
var (
litread int
skipped int
err error
)
litread, skipped, validRun, err = r.consumeLiteralsInt96(dc, vals, remain, indexbuffer[:], validRun, bitReader)
if err != nil {
return read, err
}
batch = litread + skipped
remain -= litread
}
vals = vals[batch:]
read += batch
}
return read, nil
}
func (r *RleDecoder) consumeLiteralsInt96(dc DictionaryConverter, vals []parquet.Int96, remain int, buf []IndexType, run BitRun, bitRdr BitRunReader) (int, int, BitRun, error) {
batch := MinInt(MinInt(remain, int(r.litCount)), len(buf))
buf = buf[:batch]
n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
if n != batch {
return 0, 0, run, xerrors.New("was not able to retrieve correct number of indexes")
}
if !dc.IsValid(buf...) {
return 0, 0, run, xerrors.New("invalid index values found for dictionary converter")
}
var (
read int
skipped int
)
for read < batch {
if run.Set {
updateSize := MinInt(batch-read, int(run.Len))
if err := dc.Copy(vals, buf[read:read+updateSize]); err != nil {
return 0, 0, run, err
}
read += updateSize
vals = vals[updateSize:]
run.Len -= int64(updateSize)
} else {
dc.FillZero(vals[:int(run.Len)])
vals = vals[int(run.Len):]
skipped += int(run.Len)
run.Len = 0
}
if run.Len == 0 {
run = bitRdr.NextRun()
}
}
r.litCount -= int32(batch)
return read, skipped, run, nil
}
func (r *RleDecoder) GetBatchWithDictInt96(dc DictionaryConverter, vals []parquet.Int96) (int, error) {
var (
read = 0
size = len(vals)
indexbuffer [1024]IndexType
)
for read < size {
remain := size - read
switch {
case r.repCount > 0:
idx := IndexType(r.curVal)
if !dc.IsValid(idx) {
return read, nil
}
batch := MinInt(remain, int(r.repCount))
if err := dc.Fill(vals[:batch], idx); err != nil {
return read, err
}
r.repCount -= int32(batch)
read += batch
vals = vals[batch:]
case r.litCount > 0:
litbatch := MinInt(MinInt(remain, int(r.litCount)), 1024)
buf := indexbuffer[:litbatch]
n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
if n != litbatch {
return read, nil
}
if !dc.IsValid(buf...) {
return read, nil
}
if err := dc.Copy(vals, buf); err != nil {
return read, nil
}
r.litCount -= int32(litbatch)
read += litbatch
vals = vals[litbatch:]
default:
if !r.Next() {
return read, nil
}
}
}
return read, nil
}
func (r *RleDecoder) GetBatchWithDictSpacedFloat32(dc DictionaryConverter, vals []float32, nullCount int, validBits []byte, validBitsOffset int64) (totalProcessed int, err error) {
if nullCount == 0 {
return r.GetBatchWithDictFloat32(dc, vals)
}
var (
blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
processed = 0
block BitBlockCount
)
for {
block = blockCounter.NextFourWords()
if block.Len == 0 {
break
}
switch {
case block.AllSet():
processed, err = r.GetBatchWithDictFloat32(dc, vals[:block.Len])
case block.NoneSet():
dc.FillZero(vals[:block.Len])
processed = int(block.Len)
default:
processed, err = r.getspacedFloat32(dc, vals, int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
}
if err != nil {
break
}
totalProcessed += processed
vals = vals[int(block.Len):]
validBitsOffset += int64(block.Len)
if processed != int(block.Len) {
break
}
}
return
}
func (r *RleDecoder) getspacedFloat32(dc DictionaryConverter, vals []float32, batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
if nullCount == batchSize {
dc.FillZero(vals[:batchSize])
return batchSize, nil
}
read := 0
remain := batchSize - nullCount
const bufferSize = 1024
var indexbuffer [bufferSize]IndexType
// assume no bits to start
bitReader := NewBitRunReader(validBits, validBitsOffset, int64(batchSize))
validRun := bitReader.NextRun()
for read < batchSize {
if validRun.Len == 0 {
validRun = bitReader.NextRun()
}
if !validRun.Set {
dc.FillZero(vals[:int(validRun.Len)])
vals = vals[int(validRun.Len):]
read += int(validRun.Len)
validRun.Len = 0
continue
}
if r.repCount == 0 && r.litCount == 0 {
if !r.Next() {
return read, nil
}
}
var batch int
switch {
case r.repCount > 0:
batch, remain, validRun = r.consumeRepeatCounts(read, batchSize, remain, validRun, bitReader)
current := IndexType(r.curVal)
if !dc.IsValid(current) {
return read, nil
}
dc.Fill(vals[:batch], current)
case r.litCount > 0:
var (
litread int
skipped int
err error
)
litread, skipped, validRun, err = r.consumeLiteralsFloat32(dc, vals, remain, indexbuffer[:], validRun, bitReader)
if err != nil {
return read, err
}
batch = litread + skipped
remain -= litread
}
vals = vals[batch:]
read += batch
}
return read, nil
}
func (r *RleDecoder) consumeLiteralsFloat32(dc DictionaryConverter, vals []float32, remain int, buf []IndexType, run BitRun, bitRdr BitRunReader) (int, int, BitRun, error) {
batch := MinInt(MinInt(remain, int(r.litCount)), len(buf))
buf = buf[:batch]
n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
if n != batch {
return 0, 0, run, xerrors.New("was not able to retrieve correct number of indexes")
}
if !dc.IsValid(buf...) {
return 0, 0, run, xerrors.New("invalid index values found for dictionary converter")
}
var (
read int
skipped int
)
for read < batch {
if run.Set {
updateSize := MinInt(batch-read, int(run.Len))
if err := dc.Copy(vals, buf[read:read+updateSize]); err != nil {
return 0, 0, run, err
}
read += updateSize
vals = vals[updateSize:]
run.Len -= int64(updateSize)
} else {
dc.FillZero(vals[:int(run.Len)])
vals = vals[int(run.Len):]
skipped += int(run.Len)
run.Len = 0
}
if run.Len == 0 {
run = bitRdr.NextRun()
}
}
r.litCount -= int32(batch)
return read, skipped, run, nil
}
func (r *RleDecoder) GetBatchWithDictFloat32(dc DictionaryConverter, vals []float32) (int, error) {
var (
read = 0
size = len(vals)
indexbuffer [1024]IndexType
)
for read < size {
remain := size - read
switch {
case r.repCount > 0:
idx := IndexType(r.curVal)
if !dc.IsValid(idx) {
return read, nil
}
batch := MinInt(remain, int(r.repCount))
if err := dc.Fill(vals[:batch], idx); err != nil {
return read, err
}
r.repCount -= int32(batch)
read += batch
vals = vals[batch:]
case r.litCount > 0:
litbatch := MinInt(MinInt(remain, int(r.litCount)), 1024)
buf := indexbuffer[:litbatch]
n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
if n != litbatch {
return read, nil
}
if !dc.IsValid(buf...) {
return read, nil
}
if err := dc.Copy(vals, buf); err != nil {
return read, nil
}
r.litCount -= int32(litbatch)
read += litbatch
vals = vals[litbatch:]
default:
if !r.Next() {
return read, nil
}
}
}
return read, nil
}
func (r *RleDecoder) GetBatchWithDictSpacedFloat64(dc DictionaryConverter, vals []float64, nullCount int, validBits []byte, validBitsOffset int64) (totalProcessed int, err error) {
if nullCount == 0 {
return r.GetBatchWithDictFloat64(dc, vals)
}
var (
blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
processed = 0
block BitBlockCount
)
for {
block = blockCounter.NextFourWords()
if block.Len == 0 {
break
}
switch {
case block.AllSet():
processed, err = r.GetBatchWithDictFloat64(dc, vals[:block.Len])
case block.NoneSet():
dc.FillZero(vals[:block.Len])
processed = int(block.Len)
default:
processed, err = r.getspacedFloat64(dc, vals, int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
}
if err != nil {
break
}
totalProcessed += processed
vals = vals[int(block.Len):]
validBitsOffset += int64(block.Len)
if processed != int(block.Len) {
break
}
}
return
}
func (r *RleDecoder) getspacedFloat64(dc DictionaryConverter, vals []float64, batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
if nullCount == batchSize {
dc.FillZero(vals[:batchSize])
return batchSize, nil
}
read := 0
remain := batchSize - nullCount
const bufferSize = 1024
var indexbuffer [bufferSize]IndexType
// assume no bits to start
bitReader := NewBitRunReader(validBits, validBitsOffset, int64(batchSize))
validRun := bitReader.NextRun()
for read < batchSize {
if validRun.Len == 0 {
validRun = bitReader.NextRun()
}
if !validRun.Set {
dc.FillZero(vals[:int(validRun.Len)])
vals = vals[int(validRun.Len):]
read += int(validRun.Len)
validRun.Len = 0
continue
}
if r.repCount == 0 && r.litCount == 0 {
if !r.Next() {
return read, nil
}
}
var batch int
switch {
case r.repCount > 0:
batch, remain, validRun = r.consumeRepeatCounts(read, batchSize, remain, validRun, bitReader)
current := IndexType(r.curVal)
if !dc.IsValid(current) {
return read, nil
}
dc.Fill(vals[:batch], current)
case r.litCount > 0:
var (
litread int
skipped int
err error
)
litread, skipped, validRun, err = r.consumeLiteralsFloat64(dc, vals, remain, indexbuffer[:], validRun, bitReader)
if err != nil {
return read, err
}
batch = litread + skipped
remain -= litread
}
vals = vals[batch:]
read += batch
}
return read, nil
}
func (r *RleDecoder) consumeLiteralsFloat64(dc DictionaryConverter, vals []float64, remain int, buf []IndexType, run BitRun, bitRdr BitRunReader) (int, int, BitRun, error) {
batch := MinInt(MinInt(remain, int(r.litCount)), len(buf))
buf = buf[:batch]
n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
if n != batch {
return 0, 0, run, xerrors.New("was not able to retrieve correct number of indexes")
}
if !dc.IsValid(buf...) {
return 0, 0, run, xerrors.New("invalid index values found for dictionary converter")
}
var (
read int
skipped int
)
for read < batch {
if run.Set {
updateSize := MinInt(batch-read, int(run.Len))
if err := dc.Copy(vals, buf[read:read+updateSize]); err != nil {
return 0, 0, run, err
}
read += updateSize
vals = vals[updateSize:]
run.Len -= int64(updateSize)
} else {
dc.FillZero(vals[:int(run.Len)])
vals = vals[int(run.Len):]
skipped += int(run.Len)
run.Len = 0
}
if run.Len == 0 {
run = bitRdr.NextRun()
}
}
r.litCount -= int32(batch)
return read, skipped, run, nil
}
func (r *RleDecoder) GetBatchWithDictFloat64(dc DictionaryConverter, vals []float64) (int, error) {
var (
read = 0
size = len(vals)
indexbuffer [1024]IndexType
)
for read < size {
remain := size - read
switch {
case r.repCount > 0:
idx := IndexType(r.curVal)
if !dc.IsValid(idx) {
return read, nil
}
batch := MinInt(remain, int(r.repCount))
if err := dc.Fill(vals[:batch], idx); err != nil {
return read, err
}
r.repCount -= int32(batch)
read += batch
vals = vals[batch:]
case r.litCount > 0:
litbatch := MinInt(MinInt(remain, int(r.litCount)), 1024)
buf := indexbuffer[:litbatch]
n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
if n != litbatch {
return read, nil
}
if !dc.IsValid(buf...) {
return read, nil
}
if err := dc.Copy(vals, buf); err != nil {
return read, nil
}
r.litCount -= int32(litbatch)
read += litbatch
vals = vals[litbatch:]
default:
if !r.Next() {
return read, nil
}
}
}
return read, nil
}
func (r *RleDecoder) GetBatchWithDictSpacedByteArray(dc DictionaryConverter, vals []parquet.ByteArray, nullCount int, validBits []byte, validBitsOffset int64) (totalProcessed int, err error) {
if nullCount == 0 {
return r.GetBatchWithDictByteArray(dc, vals)
}
var (
blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
processed = 0
block BitBlockCount
)
for {
block = blockCounter.NextFourWords()
if block.Len == 0 {
break
}
switch {
case block.AllSet():
processed, err = r.GetBatchWithDictByteArray(dc, vals[:block.Len])
case block.NoneSet():
dc.FillZero(vals[:block.Len])
processed = int(block.Len)
default:
processed, err = r.getspacedByteArray(dc, vals, int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
}
if err != nil {
break
}
totalProcessed += processed
vals = vals[int(block.Len):]
validBitsOffset += int64(block.Len)
if processed != int(block.Len) {
break
}
}
return
}
func (r *RleDecoder) getspacedByteArray(dc DictionaryConverter, vals []parquet.ByteArray, batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
if nullCount == batchSize {
dc.FillZero(vals[:batchSize])
return batchSize, nil
}
read := 0
remain := batchSize - nullCount
const bufferSize = 1024
var indexbuffer [bufferSize]IndexType
// assume no bits to start
bitReader := NewBitRunReader(validBits, validBitsOffset, int64(batchSize))
validRun := bitReader.NextRun()
for read < batchSize {
if validRun.Len == 0 {
validRun = bitReader.NextRun()
}
if !validRun.Set {
dc.FillZero(vals[:int(validRun.Len)])
vals = vals[int(validRun.Len):]
read += int(validRun.Len)
validRun.Len = 0
continue
}
if r.repCount == 0 && r.litCount == 0 {
if !r.Next() {
return read, nil
}
}
var batch int
switch {
case r.repCount > 0:
batch, remain, validRun = r.consumeRepeatCounts(read, batchSize, remain, validRun, bitReader)
current := IndexType(r.curVal)
if !dc.IsValid(current) {
return read, nil
}
dc.Fill(vals[:batch], current)
case r.litCount > 0:
var (
litread int
skipped int
err error
)
litread, skipped, validRun, err = r.consumeLiteralsByteArray(dc, vals, remain, indexbuffer[:], validRun, bitReader)
if err != nil {
return read, err
}
batch = litread + skipped
remain -= litread
}
vals = vals[batch:]
read += batch
}
return read, nil
}
func (r *RleDecoder) consumeLiteralsByteArray(dc DictionaryConverter, vals []parquet.ByteArray, remain int, buf []IndexType, run BitRun, bitRdr BitRunReader) (int, int, BitRun, error) {
batch := MinInt(MinInt(remain, int(r.litCount)), len(buf))
buf = buf[:batch]
n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
if n != batch {
return 0, 0, run, xerrors.New("was not able to retrieve correct number of indexes")
}
if !dc.IsValid(buf...) {
return 0, 0, run, xerrors.New("invalid index values found for dictionary converter")
}
var (
read int
skipped int
)
for read < batch {
if run.Set {
updateSize := MinInt(batch-read, int(run.Len))
if err := dc.Copy(vals, buf[read:read+updateSize]); err != nil {
return 0, 0, run, err
}
read += updateSize
vals = vals[updateSize:]
run.Len -= int64(updateSize)
} else {
dc.FillZero(vals[:int(run.Len)])
vals = vals[int(run.Len):]
skipped += int(run.Len)
run.Len = 0
}
if run.Len == 0 {
run = bitRdr.NextRun()
}
}
r.litCount -= int32(batch)
return read, skipped, run, nil
}
func (r *RleDecoder) GetBatchWithDictByteArray(dc DictionaryConverter, vals []parquet.ByteArray) (int, error) {
var (
read = 0
size = len(vals)
indexbuffer [1024]IndexType
)
for read < size {
remain := size - read
switch {
case r.repCount > 0:
idx := IndexType(r.curVal)
if !dc.IsValid(idx) {
return read, nil
}
batch := MinInt(remain, int(r.repCount))
if err := dc.Fill(vals[:batch], idx); err != nil {
return read, err
}
r.repCount -= int32(batch)
read += batch
vals = vals[batch:]
case r.litCount > 0:
litbatch := MinInt(MinInt(remain, int(r.litCount)), 1024)
buf := indexbuffer[:litbatch]
n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
if n != litbatch {
return read, nil
}
if !dc.IsValid(buf...) {
return read, nil
}
if err := dc.Copy(vals, buf); err != nil {
return read, nil
}
r.litCount -= int32(litbatch)
read += litbatch
vals = vals[litbatch:]
default:
if !r.Next() {
return read, nil
}
}
}
return read, nil
}
func (r *RleDecoder) GetBatchWithDictSpacedFixedLenByteArray(dc DictionaryConverter, vals []parquet.FixedLenByteArray, nullCount int, validBits []byte, validBitsOffset int64) (totalProcessed int, err error) {
if nullCount == 0 {
return r.GetBatchWithDictFixedLenByteArray(dc, vals)
}
var (
blockCounter = NewBitBlockCounter(validBits, validBitsOffset, int64(len(vals)))
processed = 0
block BitBlockCount
)
for {
block = blockCounter.NextFourWords()
if block.Len == 0 {
break
}
switch {
case block.AllSet():
processed, err = r.GetBatchWithDictFixedLenByteArray(dc, vals[:block.Len])
case block.NoneSet():
dc.FillZero(vals[:block.Len])
processed = int(block.Len)
default:
processed, err = r.getspacedFixedLenByteArray(dc, vals, int(block.Len), int(block.Len)-int(block.Popcnt), validBits, validBitsOffset)
}
if err != nil {
break
}
totalProcessed += processed
vals = vals[int(block.Len):]
validBitsOffset += int64(block.Len)
if processed != int(block.Len) {
break
}
}
return
}
func (r *RleDecoder) getspacedFixedLenByteArray(dc DictionaryConverter, vals []parquet.FixedLenByteArray, batchSize, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
if nullCount == batchSize {
dc.FillZero(vals[:batchSize])
return batchSize, nil
}
read := 0
remain := batchSize - nullCount
const bufferSize = 1024
var indexbuffer [bufferSize]IndexType
// assume no bits to start
bitReader := NewBitRunReader(validBits, validBitsOffset, int64(batchSize))
validRun := bitReader.NextRun()
for read < batchSize {
if validRun.Len == 0 {
validRun = bitReader.NextRun()
}
if !validRun.Set {
dc.FillZero(vals[:int(validRun.Len)])
vals = vals[int(validRun.Len):]
read += int(validRun.Len)
validRun.Len = 0
continue
}
if r.repCount == 0 && r.litCount == 0 {
if !r.Next() {
return read, nil
}
}
var batch int
switch {
case r.repCount > 0:
batch, remain, validRun = r.consumeRepeatCounts(read, batchSize, remain, validRun, bitReader)
current := IndexType(r.curVal)
if !dc.IsValid(current) {
return read, nil
}
dc.Fill(vals[:batch], current)
case r.litCount > 0:
var (
litread int
skipped int
err error
)
litread, skipped, validRun, err = r.consumeLiteralsFixedLenByteArray(dc, vals, remain, indexbuffer[:], validRun, bitReader)
if err != nil {
return read, err
}
batch = litread + skipped
remain -= litread
}
vals = vals[batch:]
read += batch
}
return read, nil
}
func (r *RleDecoder) consumeLiteralsFixedLenByteArray(dc DictionaryConverter, vals []parquet.FixedLenByteArray, remain int, buf []IndexType, run BitRun, bitRdr BitRunReader) (int, int, BitRun, error) {
batch := MinInt(MinInt(remain, int(r.litCount)), len(buf))
buf = buf[:batch]
n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
if n != batch {
return 0, 0, run, xerrors.New("was not able to retrieve correct number of indexes")
}
if !dc.IsValid(buf...) {
return 0, 0, run, xerrors.New("invalid index values found for dictionary converter")
}
var (
read int
skipped int
)
for read < batch {
if run.Set {
updateSize := MinInt(batch-read, int(run.Len))
if err := dc.Copy(vals, buf[read:read+updateSize]); err != nil {
return 0, 0, run, err
}
read += updateSize
vals = vals[updateSize:]
run.Len -= int64(updateSize)
} else {
dc.FillZero(vals[:int(run.Len)])
vals = vals[int(run.Len):]
skipped += int(run.Len)
run.Len = 0
}
if run.Len == 0 {
run = bitRdr.NextRun()
}
}
r.litCount -= int32(batch)
return read, skipped, run, nil
}
func (r *RleDecoder) GetBatchWithDictFixedLenByteArray(dc DictionaryConverter, vals []parquet.FixedLenByteArray) (int, error) {
var (
read = 0
size = len(vals)
indexbuffer [1024]IndexType
)
for read < size {
remain := size - read
switch {
case r.repCount > 0:
idx := IndexType(r.curVal)
if !dc.IsValid(idx) {
return read, nil
}
batch := MinInt(remain, int(r.repCount))
if err := dc.Fill(vals[:batch], idx); err != nil {
return read, err
}
r.repCount -= int32(batch)
read += batch
vals = vals[batch:]
case r.litCount > 0:
litbatch := MinInt(MinInt(remain, int(r.litCount)), 1024)
buf := indexbuffer[:litbatch]
n, _ := r.r.GetBatchIndex(uint(r.bitWidth), buf)
if n != litbatch {
return read, nil
}
if !dc.IsValid(buf...) {
return read, nil
}
if err := dc.Copy(vals, buf); err != nil {
return read, nil
}
r.litCount -= int32(litbatch)
read += litbatch
vals = vals[litbatch:]
default:
if !r.Next() {
return read, nil
}
}
}
return read, nil
}